diff options
author | 2018-07-10 13:30:57 -0700 | |
---|---|---|
committer | 2018-07-13 01:28:10 -0700 | |
commit | 37d8bbc32dd7929682f9dacd4b7041f76f169877 (patch) | |
tree | 7c039054a298e3d701b5c58ca67ad84e14e553d5 /src/core/lib/iomgr/executor.cc | |
parent | 7cb6e9dca755ee999dfcce1722f574053eb223a3 (diff) |
resolver and default executors
Diffstat (limited to 'src/core/lib/iomgr/executor.cc')
-rw-r--r-- | src/core/lib/iomgr/executor.cc | 114 |
1 files changed, 79 insertions, 35 deletions
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 1ad13b831d..d87eb4fbf8 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -44,7 +44,7 @@ grpc_core::TraceFlag executor_trace(false, "executor"); GPR_TLS_DECL(g_this_thread_state); -GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) { +GrpcExecutor::GrpcExecutor(const char* name) : name_(name) { adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; gpr_atm_no_barrier_store(&num_threads_, 0); max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); @@ -298,60 +298,104 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } while (retry_push); } -static GrpcExecutor* global_executor; +static GrpcExecutor* executors[GRPC_NUM_EXECUTORS]; -void enqueue_long(grpc_closure* closure, grpc_error* error) { - global_executor->Enqueue(closure, error, false /* is_short */); +void default_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, + true /* is_short */); } -void enqueue_short(grpc_closure* closure, grpc_error* error) { - global_executor->Enqueue(closure, error, true /* is_short */); +void default_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, + false /* is_short */); } -// Short-Job executor scheduler -static const grpc_closure_scheduler_vtable global_executor_vtable_short = { - enqueue_short, enqueue_short, "executor-short"}; -static grpc_closure_scheduler global_scheduler_short = { - &global_executor_vtable_short}; +void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, + true /* is_short */); +} + +void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, + false /* is_short */); +} -// Long-job executor scheduler -static const grpc_closure_scheduler_vtable global_executor_vtable_long = { - enqueue_long, enqueue_long, "executor-long"}; -static grpc_closure_scheduler global_scheduler_long = { - &global_executor_vtable_long}; +static const grpc_closure_scheduler_vtable vtables_[] = { + {&default_enqueue_short, &default_enqueue_short, "def-ex-short"}, + {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}, + {&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"}, + {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}; + +static grpc_closure_scheduler schedulers_[] = { + {&vtables_[0]}, // Default short + {&vtables_[1]}, // Default long + {&vtables_[2]}, // Resolver short + {&vtables_[3]} // Resolver long +}; + +const char* executor_name(GrpcExecutorType executor_type) { + switch (executor_type) { + case GRPC_DEFAULT_EXECUTOR: + return "default-executor"; + case GRPC_RESOLVER_EXECUTOR: + return "resolver-executor"; + default: + GPR_UNREACHABLE_CODE(return "unknown"); + } + GPR_UNREACHABLE_CODE(return "unknown"); +} // grpc_executor_init() and grpc_executor_shutdown() functions are called in the // the grpc_init() and grpc_shutdown() code paths which are protected by a // global mutex. So it is okay to assume that these functions are thread-safe void grpc_executor_init() { - if (global_executor != nullptr) { - // grpc_executor_init() already called once (and grpc_executor_shutdown() - // wasn't called) - return; + for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { + // Return if grpc_executor_init() already called earlier + if (executors[i] != nullptr) { + GPR_ASSERT(i == 0); + break; + } + + executors[i] = grpc_core::New<GrpcExecutor>( + executor_name(static_cast<GrpcExecutorType>(i))); + executors[i]->Init(); } +} - global_executor = grpc_core::New<GrpcExecutor>("global-executor"); - global_executor->Init(); +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, + GrpcExecutorJobType job_type) { + return &schedulers_[(executor_type * GRPC_NUM_EXECUTORS) + job_type]; +} + +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { + return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type); } void grpc_executor_shutdown() { - // Shutdown already called - if (global_executor == nullptr) { - return; - } + for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { + // Return if grpc_executor_shutdown() is already called earlier + if (executors[i] == nullptr) { + GPR_ASSERT(i == 0); + break; + } - global_executor->Shutdown(); - grpc_core::Delete<GrpcExecutor>(global_executor); - global_executor = nullptr; + executors[i]->Shutdown(); + grpc_core::Delete<GrpcExecutor>(executors[i]); + executors[i] = nullptr; + } } -bool grpc_executor_is_threaded() { return global_executor->IsThreaded(); } +bool grpc_executor_is_threaded(GrpcExecutorType executor_type) { + GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS); + return executors[executor_type]->IsThreaded(); +} -void grpc_executor_set_threading(bool enable) { - global_executor->SetThreading(enable); +bool grpc_executor_is_threaded() { + return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR); } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { - return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short - : &global_scheduler_long; +void grpc_executor_set_threading(bool enable) { + for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { + executors[i]->SetThreading(enable); + } } |