diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2018-01-12 10:16:22 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-12 10:16:22 +0100 |
commit | c9ec2c0888271491eaf425721a72736392f85945 (patch) | |
tree | 8ee3fe7e6fe56bed7bbfa7c8537add5f24afe12a /src | |
parent | b0b4555f4ca720e626f42855f2257bf598e1bf74 (diff) |
Revert "Stop using std::thread in C++ library since it can trigger exceptions"
Diffstat (limited to 'src')
-rw-r--r-- | src/cpp/client/secure_credentials.cc | 14 | ||||
-rw-r--r-- | src/cpp/server/create_default_thread_pool.cc | 2 | ||||
-rw-r--r-- | src/cpp/server/dynamic_thread_pool.cc | 54 | ||||
-rw-r--r-- | src/cpp/server/dynamic_thread_pool.h | 20 | ||||
-rw-r--r-- | src/cpp/server/secure_server_credentials.cc | 7 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 7 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 49 | ||||
-rw-r--r-- | src/cpp/server/thread_pool_interface.h | 4 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 54 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h | 29 |
10 files changed, 60 insertions, 180 deletions
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 94519d817b..4fb128d98b 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -189,16 +189,10 @@ int MetadataCredentialsPluginWrapper::GetMetadata( } if (w->plugin_->IsBlocking()) { // Asynchronous return. - if (w->thread_pool_->Add(std::bind( - &MetadataCredentialsPluginWrapper::InvokePlugin, w, context, cb, - user_data, nullptr, nullptr, nullptr, nullptr))) { - return 0; - } else { - *num_creds_md = 0; - *status = GRPC_STATUS_RESOURCE_EXHAUSTED; - *error_details = nullptr; - return true; - } + w->thread_pool_->Add( + std::bind(&MetadataCredentialsPluginWrapper::InvokePlugin, w, context, + cb, user_data, nullptr, nullptr, nullptr, nullptr)); + return 0; } else { // Synchronous return. w->InvokePlugin(context, cb, user_data, creds_md, num_creds_md, status, diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc index 2d2abbe9d1..8ca3e32c2f 100644 --- a/src/cpp/server/create_default_thread_pool.cc +++ b/src/cpp/server/create_default_thread_pool.cc @@ -28,7 +28,7 @@ namespace { ThreadPoolInterface* CreateDefaultThreadPoolImpl() { int cores = gpr_cpu_num_cores(); if (!cores) cores = 4; - return new DynamicThreadPool(cores, gpr_thd_new, gpr_thd_join); + return new DynamicThreadPool(cores); } CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl; diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index d0e62313f6..81c78fe739 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -19,32 +19,19 @@ #include "src/cpp/server/dynamic_thread_pool.h" #include <mutex> +#include <thread> #include <grpc/support/log.h> -#include <grpc/support/thd.h> namespace grpc { -DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool, - bool* valid) - : pool_(pool) { - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - - std::lock_guard<std::mutex> l(dt_mu_); - valid_ = *valid = pool->thread_creator_( - &thd_, "dynamic thread", - [](void* th) { - reinterpret_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc(); - }, - this, &opt); -} - +DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool) + : pool_(pool), + thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, + this)) {} DynamicThreadPool::DynamicThread::~DynamicThread() { - std::lock_guard<std::mutex> l(dt_mu_); - if (valid_) { - pool_->thread_joiner_(thd_); - } + thd_->join(); + thd_.reset(); } void DynamicThreadPool::DynamicThread::ThreadFunc() { @@ -86,26 +73,15 @@ void DynamicThreadPool::ThreadFunc() { } } -DynamicThreadPool::DynamicThreadPool( - int reserve_threads, - std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, - const gpr_thd_options*)> - thread_creator, - std::function<void(gpr_thd_id)> thread_joiner) +DynamicThreadPool::DynamicThreadPool(int reserve_threads) : shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0), - threads_waiting_(0), - thread_creator_(thread_creator), - thread_joiner_(thread_joiner) { + threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { std::lock_guard<std::mutex> lock(mu_); nthreads_++; - bool valid; - auto* th = new DynamicThread(this, &valid); - if (!valid) { - delete th; - } + new DynamicThread(this); } } @@ -125,7 +101,7 @@ DynamicThreadPool::~DynamicThreadPool() { ReapThreads(&dead_threads_); } -bool DynamicThreadPool::Add(const std::function<void()>& callback) { +void DynamicThreadPool::Add(const std::function<void()>& callback) { std::lock_guard<std::mutex> lock(mu_); // Add works to the callbacks list callbacks_.push(callback); @@ -133,12 +109,7 @@ bool DynamicThreadPool::Add(const std::function<void()>& callback) { if (threads_waiting_ == 0) { // Kick off a new thread nthreads_++; - bool valid; - auto* th = new DynamicThread(this, &valid); - if (!valid) { - delete th; - return false; - } + new DynamicThread(this); } else { cv_.notify_one(); } @@ -146,7 +117,6 @@ bool DynamicThreadPool::Add(const std::function<void()>& callback) { if (!dead_threads_.empty()) { ReapThreads(&dead_threads_); } - return true; } } // namespace grpc diff --git a/src/cpp/server/dynamic_thread_pool.h b/src/cpp/server/dynamic_thread_pool.h index 75d31cd908..9237c6e5ca 100644 --- a/src/cpp/server/dynamic_thread_pool.h +++ b/src/cpp/server/dynamic_thread_pool.h @@ -24,9 +24,9 @@ #include <memory> #include <mutex> #include <queue> +#include <thread> #include <grpc++/support/config.h> -#include <grpc/support/thd.h> #include "src/cpp/server/thread_pool_interface.h" @@ -34,26 +34,20 @@ namespace grpc { class DynamicThreadPool final : public ThreadPoolInterface { public: - DynamicThreadPool(int reserve_threads, - std::function<int(gpr_thd_id*, const char*, void (*)(void*), - void*, const gpr_thd_options*)> - thread_creator, - std::function<void(gpr_thd_id)> thread_joiner); + explicit DynamicThreadPool(int reserve_threads); ~DynamicThreadPool(); - bool Add(const std::function<void()>& callback) override; + void Add(const std::function<void()>& callback) override; private: class DynamicThread { public: - DynamicThread(DynamicThreadPool* pool, bool* valid); + DynamicThread(DynamicThreadPool* pool); ~DynamicThread(); private: DynamicThreadPool* pool_; - std::mutex dt_mu_; - gpr_thd_id thd_; - bool valid_; + std::unique_ptr<std::thread> thd_; void ThreadFunc(); }; std::mutex mu_; @@ -65,10 +59,6 @@ class DynamicThreadPool final : public ThreadPoolInterface { int nthreads_; int threads_waiting_; std::list<DynamicThread*> dead_threads_; - std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, - const gpr_thd_options*)> - thread_creator_; - std::function<void(gpr_thd_id)> thread_joiner_; void ThreadFunc(); static void ReapThreads(std::list<DynamicThread*>* tlist); diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index fa08a6200f..0fbe4ccd18 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -43,14 +43,9 @@ void AuthMetadataProcessorAyncWrapper::Process( return; } if (w->processor_->IsBlocking()) { - bool added = w->thread_pool_->Add( + w->thread_pool_->Add( std::bind(&AuthMetadataProcessorAyncWrapper::InvokeProcessor, w, context, md, num_md, cb, user_data)); - if (!added) { - // no thread available, so fail with temporary resource unavailability - cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_UNAVAILABLE, nullptr); - return; - } } else { // invoke directly. w->InvokeProcessor(context, md, num_md, cb, user_data); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index d91ee7f4e3..200e477822 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -23,7 +23,6 @@ #include <grpc++/server.h> #include <grpc/support/cpu.h> #include <grpc/support/log.h> -#include <grpc/support/thd.h> #include <grpc/support/useful.h> #include "src/cpp/server/thread_pool_interface.h" @@ -44,9 +43,7 @@ ServerBuilder::ServerBuilder() max_send_message_size_(-1), sync_server_settings_(SyncServerSettings()), resource_quota_(nullptr), - generic_service_(nullptr), - thread_creator_(gpr_thd_new), - thread_joiner_(gpr_thd_join) { + generic_service_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); for (auto it = g_plugin_factory_list->begin(); it != g_plugin_factory_list->end(); it++) { @@ -265,7 +262,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<Server> server(new Server( max_receive_message_size_, &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec, thread_creator_, thread_joiner_)); + sync_server_settings_.cq_timeout_msec)); if (has_sync_methods) { // This is a Sync server diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 02a663d660..4f8f4e06fc 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -36,7 +36,6 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/thd.h> #include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/lib/profiling/timers.h" @@ -196,10 +195,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { call_(mrd->call_, server, &cq_, server->max_receive_message_size()), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(has_request_payload_ ? mrd->request_payload_ - : nullptr), - method_(mrd->method_), - server_(server) { + request_payload_(mrd->request_payload_), + method_(mrd->method_) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -213,13 +210,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { } } - void Run(std::shared_ptr<GlobalCallbacks> global_callbacks, - bool resources) { + void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - auto* handler = resources ? method_->handler() - : server_->resource_exhausted_handler_.get(); - handler->RunHandler(internal::MethodHandler::HandlerParameter( + method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; @@ -241,7 +235,6 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { const bool has_request_payload_; grpc_byte_buffer* request_payload_; internal::RpcServiceMethod* const method_; - Server* server_; }; private: @@ -262,15 +255,11 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { // appropriate RPC handlers class Server::SyncRequestThreadManager : public ThreadManager { public: - SyncRequestThreadManager( - Server* server, CompletionQueue* server_cq, - std::shared_ptr<GlobalCallbacks> global_callbacks, int min_pollers, - int max_pollers, int cq_timeout_msec, - std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, - const gpr_thd_options*)> - thread_creator, - std::function<void(gpr_thd_id)> thread_joiner) - : ThreadManager(min_pollers, max_pollers, thread_creator, thread_joiner), + SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, + std::shared_ptr<GlobalCallbacks> global_callbacks, + int min_pollers, int max_pollers, + int cq_timeout_msec) + : ThreadManager(min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), @@ -296,7 +285,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok, bool resources) override { + void DoWork(void* tag, bool ok) override { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { @@ -316,7 +305,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_, resources); + cd.Run(global_callbacks_); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -378,11 +367,7 @@ Server::Server( int max_receive_message_size, ChannelArguments* args, std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec, - std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, - const gpr_thd_options*)> - thread_creator, - std::function<void(gpr_thd_id)> thread_joiner) + int min_pollers, int max_pollers, int sync_cq_timeout_msec) : max_receive_message_size_(max_receive_message_size), sync_server_cqs_(sync_server_cqs), started_(false), @@ -391,9 +376,7 @@ Server::Server( has_generic_service_(false), server_(nullptr), server_initializer_(new ServerInitializer(this)), - health_check_service_disabled_(false), - thread_creator_(thread_creator), - thread_joiner_(thread_joiner) { + health_check_service_disabled_(false) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; @@ -403,7 +386,7 @@ Server::Server( it++) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( this, (*it).get(), global_callbacks_, min_pollers, max_pollers, - sync_cq_timeout_msec, thread_creator_, thread_joiner_)); + sync_cq_timeout_msec)); } grpc_channel_args channel_args; @@ -566,10 +549,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } } - if (!sync_server_cqs_->empty()) { - resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler); - } - for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } diff --git a/src/cpp/server/thread_pool_interface.h b/src/cpp/server/thread_pool_interface.h index 656e6673f1..028842a776 100644 --- a/src/cpp/server/thread_pool_interface.h +++ b/src/cpp/server/thread_pool_interface.h @@ -29,9 +29,7 @@ class ThreadPoolInterface { virtual ~ThreadPoolInterface() {} // Schedule the given callback for execution. - // Return true on success, false on failure - virtual bool Add(const std::function<void()>& callback) - GRPC_MUST_USE_RESULT = 0; + virtual void Add(const std::function<void()>& callback) = 0; }; // Allows different codebases to use their own thread pool impls diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 107c60f4eb..23264f1b5b 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -20,26 +20,18 @@ #include <climits> #include <mutex> +#include <thread> #include <grpc/support/log.h> -#include <grpc/support/thd.h> namespace grpc { -ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr, bool* valid) +ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr) : thd_mgr_(thd_mgr) { - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - // Make thread creation exclusive with respect to its join happening in // ~WorkerThread(). std::lock_guard<std::mutex> lock(wt_mu_); - *valid = valid_ = thd_mgr->thread_creator_( - &thd_, "worker thread", - [](void* th) { - reinterpret_cast<ThreadManager::WorkerThread*>(th)->Run(); - }, - this, &opt); + thd_ = std::thread(&ThreadManager::WorkerThread::Run, this); } void ThreadManager::WorkerThread::Run() { @@ -50,24 +42,15 @@ void ThreadManager::WorkerThread::Run() { ThreadManager::WorkerThread::~WorkerThread() { // Don't join until the thread is fully constructed. std::lock_guard<std::mutex> lock(wt_mu_); - if (valid_) { - thd_mgr_->thread_joiner_(thd_); - } + thd_.join(); } -ThreadManager::ThreadManager( - int min_pollers, int max_pollers, - std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, - const gpr_thd_options*)> - thread_creator, - std::function<void(gpr_thd_id)> thread_joiner) +ThreadManager::ThreadManager(int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0), - thread_creator_(thread_creator), - thread_joiner_(thread_joiner) {} + num_threads_(0) {} ThreadManager::~ThreadManager() { { @@ -128,9 +111,7 @@ void ThreadManager::Initialize() { for (int i = 0; i < min_pollers_; i++) { // Create a new thread (which ends up calling the MainWorkLoop() function - bool valid; - new WorkerThread(this, &valid); - GPR_ASSERT(valid); // we need to have at least this minimum + new WorkerThread(this); } } @@ -157,27 +138,18 @@ void ThreadManager::MainWorkLoop() { case WORK_FOUND: // If we got work and there are now insufficient pollers, start a new // one - bool resources; if (!shutdown_ && num_pollers_ < min_pollers_) { - bool valid; + num_pollers_++; + num_threads_++; // Drop lock before spawning thread to avoid contention lock.unlock(); - auto* th = new WorkerThread(this, &valid); - lock.lock(); - if (valid) { - num_pollers_++; - num_threads_++; - } else { - delete th; - } - resources = (num_pollers_ > 0); + new WorkerThread(this); } else { - resources = true; + // Drop lock for consistency with above branch + lock.unlock(); } - // Drop lock before any application work - lock.unlock(); // Lock is always released at this point - do the application work - DoWork(tag, ok, resources); + DoWork(tag, ok); // Take the lock again to check post conditions lock.lock(); // If we're shutdown, we should finish at this point. diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index c1783baa60..a206e0bd8a 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -20,23 +20,18 @@ #define GRPC_INTERNAL_CPP_THREAD_MANAGER_H #include <condition_variable> -#include <functional> #include <list> #include <memory> #include <mutex> +#include <thread> #include <grpc++/support/config.h> -#include <grpc/support/thd.h> namespace grpc { class ThreadManager { public: - ThreadManager(int min_pollers, int max_pollers, - std::function<int(gpr_thd_id*, const char*, void (*)(void*), - void*, const gpr_thd_options*)> - thread_creator, - std::function<void(gpr_thd_id)> thread_joiner); + explicit ThreadManager(int min_pollers, int max_pollers); virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads @@ -55,8 +50,6 @@ class ThreadManager { // - ThreadManager does not interpret the values of 'tag' and 'ok' // - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to // DoWork() - // - ThreadManager will also pass DoWork a bool saying if there are actually - // resources to do the work // // If the return value is SHUTDOWN:, // - ThreadManager WILL NOT call DoWork() and terminates the thead @@ -76,7 +69,7 @@ class ThreadManager { // The implementation of DoWork() should also do any setup needed to ensure // that the next call to PollForWork() (not necessarily by the current thread) // actually finds some work - virtual void DoWork(void* tag, bool ok, bool resources) = 0; + virtual void DoWork(void* tag, bool ok) = 0; // Mark the ThreadManager as shutdown and begin draining the work. This is a // non-blocking call and the caller should call Wait(), a blocking call which @@ -91,15 +84,15 @@ class ThreadManager { virtual void Wait(); private: - // Helper wrapper class around thread. This takes a ThreadManager object - // and starts a new thread to calls the Run() function. + // Helper wrapper class around std::thread. This takes a ThreadManager object + // and starts a new std::thread to calls the Run() function. // // The Run() function calls ThreadManager::MainWorkLoop() function and once // that completes, it marks the WorkerThread completed by calling // ThreadManager::MarkAsCompleted() class WorkerThread { public: - WorkerThread(ThreadManager* thd_mgr, bool* valid); + WorkerThread(ThreadManager* thd_mgr); ~WorkerThread(); private: @@ -109,8 +102,7 @@ class ThreadManager { ThreadManager* const thd_mgr_; std::mutex wt_mu_; - gpr_thd_id thd_; - bool valid_; + std::thread thd_; }; // The main funtion in ThreadManager @@ -137,13 +129,6 @@ class ThreadManager { // currently polling i.e num_pollers_) int num_threads_; - // Functions for creating/joining threads. Normally, these should - // be gpr_thd_new/gpr_thd_join but they are overridable - std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, - const gpr_thd_options*)> - thread_creator_; - std::function<void(gpr_thd_id)> thread_joiner_; - std::mutex list_mu_; std::list<WorkerThread*> completed_threads_; }; |