diff options
author | Sree Kuchibhotla <sreek@google.com> | 2018-07-10 11:29:43 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2018-07-10 11:29:43 -0700 |
commit | 83d0bfa3dbc6a15abb3821f6b16a0b1535e4c880 (patch) | |
tree | f9c99d6f2a52d3bee9f90e41b210d256ec4c5df8 /src | |
parent | 7e9d52530d0145fe8202d2fd35621407745e91ab (diff) |
address code review comemnts
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/iomgr/executor.cc | 144 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.h | 21 |
2 files changed, 81 insertions, 84 deletions
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index e9e6d0a4bb..f72e394423 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -35,18 +35,19 @@ #define MAX_DEPTH 2 -#define EXECUTOR_TRACE(format, ...) \ - if (executor_trace.enabled()) { \ - gpr_log(GPR_INFO, format, __VA_ARGS__); \ +#define EXECUTOR_TRACE(format, ...) \ + if (executor_trace.enabled()) { \ + gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \ } grpc_core::TraceFlag executor_trace(false, "executor"); GPR_TLS_DECL(g_this_thread_state); -GrpcExecutor::GrpcExecutor(const char* executor_name) : name(executor_name) { - adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; - gpr_atm_no_barrier_store(&num_threads, 0); +GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) { + adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; + gpr_atm_no_barrier_store(&num_threads_, 0); + max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); } void GrpcExecutor::Init() { SetThreading(true); } @@ -59,11 +60,11 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) { grpc_closure* next = c->next_data.next; grpc_error* error = c->error_data.error; #ifndef NDEBUG - EXECUTOR_TRACE("EXECUTOR: run %p [created by %s:%d]", c, c->file_created, + EXECUTOR_TRACE("run %p [created by %s:%d]", c, c->file_created, c->line_created); c->scheduled = false; #else - EXECUTOR_TRACE("EXECUTOR: run %p", c); + EXECUTOR_TRACE("run %p", c); #endif c->cb(c->cb_arg, error); GRPC_ERROR_UNREF(error); @@ -75,62 +76,60 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) { return n; } -bool GrpcExecutor::IsThreaded() { - return gpr_atm_no_barrier_load(&num_threads) > 0; +bool GrpcExecutor::IsThreaded() const { + return gpr_atm_no_barrier_load(&num_threads_) > 0; } void GrpcExecutor::SetThreading(bool threading) { - gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads); + const gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); if (threading) { if (curr_num_threads > 0) return; - // TODO (sreek): max_threads initialization can be moved into the - // constructor - max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); - gpr_atm_no_barrier_store(&num_threads, 1); - gpr_tls_init(&g_this_thread_state); - thd_state = static_cast<thread_state*>( - gpr_zalloc(sizeof(thread_state) * max_threads)); - - for (size_t i = 0; i < max_threads; i++) { - gpr_mu_init(&thd_state[i].mu); - gpr_cv_init(&thd_state[i].cv); - thd_state[i].id = i; - thd_state[i].thd = grpc_core::Thread(); - thd_state[i].elems = GRPC_CLOSURE_LIST_INIT; + GPR_ASSERT(num_threads_ == 0); + gpr_atm_no_barrier_store(&num_threads_, 1); + gpr_tls_init(&g_this_thread_state_); + thd_state_ = static_cast<ThreadState*>( + gpr_zalloc(sizeof(ThreadState) * max_threads_)); + + for (size_t i = 0; i < max_threads_; i++) { + gpr_mu_init(&thd_state_[i].mu); + gpr_cv_init(&thd_state_[i].cv); + thd_state_[i].id = i; + thd_state_[i].thd = grpc_core::Thread(); + thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT; } - thd_state[0].thd = - grpc_core::Thread(name, &GrpcExecutor::ThreadMain, &thd_state[0]); - thd_state[0].thd.Start(); + thd_state_[0].thd = + grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); + thd_state_[0].thd.Start(); } else { if (curr_num_threads == 0) return; - for (size_t i = 0; i < max_threads; i++) { - gpr_mu_lock(&thd_state[i].mu); - thd_state[i].shutdown = true; - gpr_cv_signal(&thd_state[i].cv); - gpr_mu_unlock(&thd_state[i].mu); + for (size_t i = 0; i < max_threads_; i++) { + gpr_mu_lock(&thd_state_[i].mu); + thd_state_[i].shutdown = true; + gpr_cv_signal(&thd_state_[i].cv); + gpr_mu_unlock(&thd_state_[i].mu); } /* Ensure no thread is adding a new thread. Once this is past, then no * thread will try to add a new one either (since shutdown is true) */ - gpr_spinlock_lock(&adding_thread_lock); - gpr_spinlock_unlock(&adding_thread_lock); + gpr_spinlock_lock(&adding_thread_lock_); + gpr_spinlock_unlock(&adding_thread_lock_); - for (gpr_atm i = 0; i < num_threads; i++) { - thd_state[i].thd.Join(); + for (gpr_atm i = 0; i < num_threads_; i++) { + thd_state_[i].thd.Join(); } - gpr_atm_no_barrier_store(&num_threads, 0); - for (size_t i = 0; i < max_threads; i++) { - gpr_mu_destroy(&thd_state[i].mu); - gpr_cv_destroy(&thd_state[i].cv); - RunClosures(thd_state[i].elems); + gpr_atm_no_barrier_store(&num_threads_, 0); + for (size_t i = 0; i < max_threads_; i++) { + gpr_mu_destroy(&thd_state_[i].mu); + gpr_cv_destroy(&thd_state_[i].cv); + RunClosures(thd_state_[i].elems); } - gpr_free(thd_state); + gpr_free(thd_state_); gpr_tls_destroy(&g_this_thread_state); } } @@ -138,14 +137,14 @@ void GrpcExecutor::SetThreading(bool threading) { void GrpcExecutor::Shutdown() { SetThreading(false); } void GrpcExecutor::ThreadMain(void* arg) { - thread_state* ts = static_cast<thread_state*>(arg); + ThreadState* ts = static_cast<ThreadState*>(arg); gpr_tls_set(&g_this_thread_state, (intptr_t)ts); grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD); size_t subtract_depth = 0; for (;;) { - EXECUTOR_TRACE("EXECUTOR[%ld]: step (sub_depth=%" PRIdPTR ")", ts->id, + EXECUTOR_TRACE("[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", ts->id, subtract_depth); gpr_mu_lock(&ts->mu); @@ -157,7 +156,7 @@ void GrpcExecutor::ThreadMain(void* arg) { } if (ts->shutdown) { - EXECUTOR_TRACE("EXECUTOR[%ld]: shutdown", ts->id); + EXECUTOR_TRACE("[%" PRIdPTR "]: shutdown", ts->id); gpr_mu_unlock(&ts->mu); break; } @@ -167,7 +166,7 @@ void GrpcExecutor::ThreadMain(void* arg) { ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); - EXECUTOR_TRACE("EXECUTOR[%ld]: execute", ts->id); + EXECUTOR_TRACE("[%" PRIdPTR "]: execute", ts->id); grpc_core::ExecCtx::Get()->InvalidateNow(); subtract_depth = RunClosures(closures); @@ -186,41 +185,42 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, do { retry_push = false; size_t cur_thread_count = - static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads)); + static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_)); // If the number of threads is zero(i.e either the executor is not threaded // or already shutdown), then queue the closure on the exec context itself if (cur_thread_count == 0) { #ifndef NDEBUG - EXECUTOR_TRACE("EXECUTOR: schedule %p (created %s:%d) inline", closure, + EXECUTOR_TRACE("schedule %p (created %s:%d) inline", closure, closure->file_created, closure->line_created); #else - EXECUTOR_TRACE("EXECUTOR: schedule %p inline", closure); + EXECUTOR_TRACE("schedule %p inline", closure); #endif grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error); return; } - thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state); + ThreadState* ts = (ThreadState*)gpr_tls_get(&g_this_thread_state); if (ts == nullptr) { - ts = &thd_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), - cur_thread_count)]; + ts = &thd_state_[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), + cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); } - thread_state* orig_ts = ts; + ThreadState* orig_ts = ts; - bool try_new_thread; + bool try_new_thread = false; for (;;) { #ifndef NDEBUG EXECUTOR_TRACE( - "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %ld", + "try to schedule %p (%s) (created %s:%d) to thread " + "%" PRIdPTR, closure, is_short ? "short" : "long", closure->file_created, closure->line_created, ts->id); #else - EXECUTOR_TRACE("EXECUTOR: try to schedule %p (%s) to thread %ld", closure, + EXECUTOR_TRACE("try to schedule %p (%s) to thread %" PRIdPTR, closure, is_short ? "short" : "long", ts->id); #endif @@ -231,7 +231,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, // guarantee no starvation). Spin through queues and try again gpr_mu_unlock(&ts->mu); size_t idx = ts->id; - ts = &thd_state[(idx + 1) % cur_thread_count]; + ts = &thd_state_[(idx + 1) % cur_thread_count]; if (ts == orig_ts) { // We cycled through all the threads. Retry enqueue again (by creating // a new thread) @@ -265,7 +265,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, // thread, use this as a hint to create more threads ts->depth++; try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < max_threads && !ts->shutdown; + cur_thread_count < max_threads_ && !ts->shutdown; ts->queued_long_job = !is_short; @@ -273,20 +273,20 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, break; } - if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock)) { + if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) { cur_thread_count = - static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads)); - if (cur_thread_count < max_threads) { + static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_)); + if (cur_thread_count < max_threads_) { // Increment num_threads (Safe to do a no_barrier_store instead of a // cas because we always increment num_threads under the // 'adding_thread_lock') - gpr_atm_no_barrier_store(&num_threads, cur_thread_count + 1); + gpr_atm_no_barrier_store(&num_threads_, cur_thread_count + 1); - thd_state[cur_thread_count].thd = grpc_core::Thread( - name, &GrpcExecutor::ThreadMain, &thd_state[cur_thread_count]); - thd_state[cur_thread_count].thd.Start(); + thd_state_[cur_thread_count].thd = grpc_core::Thread( + name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]); + thd_state_[cur_thread_count].thd.Start(); } - gpr_spinlock_unlock(&adding_thread_lock); + gpr_spinlock_unlock(&adding_thread_lock_); } if (retry_push) { @@ -298,11 +298,11 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, static GrpcExecutor g_global_executor("grpc-executor"); void enqueue_long(grpc_closure* closure, grpc_error* error) { - g_global_executor.Enqueue(closure, error, false); + g_global_executor.Enqueue(closure, error, false /* is_short */); } void enqueue_short(grpc_closure* closure, grpc_error* error) { - g_global_executor.Enqueue(closure, error, true); + g_global_executor.Enqueue(closure, error, true /* is_short */); } // Short-Job executor scheduler @@ -328,7 +328,7 @@ void grpc_executor_set_threading(bool enable) { } grpc_closure_scheduler* grpc_executor_scheduler( - grpc_executor_job_length length) { - return length == GRPC_EXECUTOR_SHORT ? &global_scheduler_short - : &global_scheduler_long; + grpc_executor_job_type job_type) { + return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short + : &global_scheduler_long; } diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index cafe47decb..b6515605cb 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -34,12 +34,9 @@ typedef struct { bool shutdown; bool queued_long_job; grpc_core::Thread thd; -} thread_state; +} ThreadState; -typedef enum { - GRPC_EXECUTOR_SHORT, - GRPC_EXECUTOR_LONG -} grpc_executor_job_length; +typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } grpc_executor_job_type; class GrpcExecutor { public: @@ -47,7 +44,7 @@ class GrpcExecutor { void Init(); /** Is the executor multi-threaded? */ - bool IsThreaded(); + bool IsThreaded() const; /* Enable/disable threading - must be called after Init and Shutdown() */ void SetThreading(bool threading); @@ -63,11 +60,11 @@ class GrpcExecutor { static size_t RunClosures(grpc_closure_list list); static void ThreadMain(void* arg); - const char* name; - thread_state* thd_state; - size_t max_threads; - gpr_atm num_threads; - gpr_spinlock adding_thread_lock; + const char* name_; + ThreadState* thd_state_; + size_t max_threads_; + gpr_atm num_threads_; + gpr_spinlock adding_thread_lock_; }; // == Global executor functions == @@ -75,7 +72,7 @@ class GrpcExecutor { void grpc_executor_init(); grpc_closure_scheduler* grpc_executor_scheduler( - grpc_executor_job_length length); + grpc_executor_job_type job_type); void grpc_executor_shutdown(); |