diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 2 | ||||
-rw-r--r-- | src/core/lib/gpr/arena.cc | 88 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.cc | 220 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.h | 45 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.cc | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_windows.cc | 5 | ||||
-rw-r--r-- | src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc | 5 | ||||
-rw-r--r-- | src/core/lib/security/security_connector/security_connector.cc | 9 | ||||
-rw-r--r-- | src/core/lib/surface/call.cc | 3 | ||||
-rw-r--r-- | src/core/lib/surface/version.cc | 2 |
10 files changed, 262 insertions, 122 deletions
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 7581f937b6..35c3fb01ea 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -82,7 +82,7 @@ typedef struct { typedef struct { grpc_call_stats stats; grpc_status_code final_status; - const char** error_string; + const char* error_string; } grpc_call_final_info; /* Channel filters specify: diff --git a/src/core/lib/gpr/arena.cc b/src/core/lib/gpr/arena.cc index 141de69426..e30b297aea 100644 --- a/src/core/lib/gpr/arena.cc +++ b/src/core/lib/gpr/arena.cc @@ -25,6 +25,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include "src/core/lib/gpr/alloc.h" @@ -36,8 +37,6 @@ #ifdef SIMPLE_ARENA_FOR_DEBUGGING -#include <grpc/support/sync.h> - struct gpr_arena { gpr_mu mu; void** ptrs; @@ -78,14 +77,17 @@ void* gpr_arena_alloc(gpr_arena* arena, size_t size) { // would allow us to use the alignment actually needed by the caller. typedef struct zone { - size_t size_begin; - size_t size_end; - gpr_atm next_atm; + size_t size_begin; // All the space we have set aside for allocations up + // until this zone. + size_t size_end; // size_end = size_begin plus all the space we set aside for + // allocations in zone z itself. + zone* next; } zone; struct gpr_arena { gpr_atm size_so_far; zone initial_zone; + gpr_mu arena_growth_mutex; }; static void* zalloc_aligned(size_t size) { @@ -99,15 +101,17 @@ gpr_arena* gpr_arena_create(size_t initial_size) { gpr_arena* a = static_cast<gpr_arena*>(zalloc_aligned( GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + initial_size)); a->initial_zone.size_end = initial_size; + gpr_mu_init(&a->arena_growth_mutex); return a; } size_t gpr_arena_destroy(gpr_arena* arena) { + gpr_mu_destroy(&arena->arena_growth_mutex); gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far); - zone* z = (zone*)gpr_atm_no_barrier_load(&arena->initial_zone.next_atm); + zone* z = arena->initial_zone.next; gpr_free_aligned(arena); while (z) { - zone* next_z = (zone*)gpr_atm_no_barrier_load(&z->next_atm); + zone* next_z = z->next; gpr_free_aligned(z); z = next_z; } @@ -116,37 +120,55 @@ size_t gpr_arena_destroy(gpr_arena* arena) { void* gpr_arena_alloc(gpr_arena* arena, size_t size) { size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(size); - size_t start = static_cast<size_t>( + size_t previous_size_of_arena_allocations = static_cast<size_t>( gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size)); + size_t updated_size_of_arena_allocations = + previous_size_of_arena_allocations + size; zone* z = &arena->initial_zone; - while (start > z->size_end) { - zone* next_z = (zone*)gpr_atm_acq_load(&z->next_atm); - if (next_z == nullptr) { - size_t next_z_size = - static_cast<size_t>(gpr_atm_no_barrier_load(&arena->size_so_far)); - next_z = static_cast<zone*>(zalloc_aligned( - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + next_z_size)); - next_z->size_begin = z->size_end; - next_z->size_end = z->size_end + next_z_size; - if (!gpr_atm_rel_cas(&z->next_atm, static_cast<gpr_atm>(NULL), - (gpr_atm)next_z)) { - gpr_free_aligned(next_z); - next_z = (zone*)gpr_atm_acq_load(&z->next_atm); + // Check to see if the allocation isn't able to end in the initial zone. + // This statement is true only in the uncommon case because of our arena + // sizing historesis (that is, most calls should have a large enough initial + // zone and will not need to grow the arena). + if (updated_size_of_arena_allocations > z->size_end) { + // Find a zone to fit this allocation + gpr_mu_lock(&arena->arena_growth_mutex); + while (updated_size_of_arena_allocations > z->size_end) { + if (z->next == nullptr) { + // Note that we do an extra increment of size_so_far to prevent multiple + // simultaneous callers from stepping on each other. However, this extra + // increment means some space in the arena is wasted. + // So whenever we need to allocate x bytes and there are x - n (where + // n > 0) remaining in the current zone, we will waste x bytes (x - n + // in the current zone and n in the new zone). + previous_size_of_arena_allocations = static_cast<size_t>( + gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size)); + updated_size_of_arena_allocations = + previous_size_of_arena_allocations + size; + size_t next_z_size = updated_size_of_arena_allocations; + z->next = static_cast<zone*>(zalloc_aligned( + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + next_z_size)); + z->next->size_begin = z->size_end; + z->next->size_end = z->size_end + next_z_size; } + z = z->next; } - z = next_z; - } - if (start + size > z->size_end) { - return gpr_arena_alloc(arena, size); + gpr_mu_unlock(&arena->arena_growth_mutex); } - GPR_ASSERT(start >= z->size_begin); - GPR_ASSERT(start + size <= z->size_end); - char* ptr = (z == &arena->initial_zone) - ? reinterpret_cast<char*>(arena) + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) - : reinterpret_cast<char*>(z) + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)); - return ptr + start - z->size_begin; + GPR_ASSERT(previous_size_of_arena_allocations >= z->size_begin); + GPR_ASSERT(updated_size_of_arena_allocations <= z->size_end); + // Skip the first part of the zone, which just contains tracking information. + // For the initial zone, this is the gpr_arena struct and for any other zone, + // it's the zone struct. + char* start_of_allocation_space = + (z == &arena->initial_zone) + ? reinterpret_cast<char*>(arena) + + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + : reinterpret_cast<char*>(z) + + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)); + // previous_size_of_arena_allocations - size_begin is how many bytes have been + // allocated into the current zone + return start_of_allocation_space + previous_size_of_arena_allocations - + z->size_begin; } #endif // SIMPLE_ARENA_FOR_DEBUGGING diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 1ad13b831d..45d96b80eb 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -40,19 +40,25 @@ gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \ } +#define EXECUTOR_TRACE0(str) \ + if (executor_trace.enabled()) { \ + gpr_log(GPR_INFO, "EXECUTOR " str); \ + } + grpc_core::TraceFlag executor_trace(false, "executor"); GPR_TLS_DECL(g_this_thread_state); -GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) { +GrpcExecutor::GrpcExecutor(const char* name) : name_(name) { adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; - gpr_atm_no_barrier_store(&num_threads_, 0); + gpr_atm_rel_store(&num_threads_, 0); max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); } void GrpcExecutor::Init() { SetThreading(true); } -size_t GrpcExecutor::RunClosures(grpc_closure_list list) { +size_t GrpcExecutor::RunClosures(const char* executor_name, + grpc_closure_list list) { size_t n = 0; grpc_closure* c = list.head; @@ -60,11 +66,11 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) { grpc_closure* next = c->next_data.next; grpc_error* error = c->error_data.error; #ifndef NDEBUG - EXECUTOR_TRACE("run %p [created by %s:%d]", c, c->file_created, - c->line_created); + EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c, + c->file_created, c->line_created); c->scheduled = false; #else - EXECUTOR_TRACE("run %p", c); + EXECUTOR_TRACE("(%s) run %p", executor_name, c); #endif c->cb(c->cb_arg, error); GRPC_ERROR_UNREF(error); @@ -77,17 +83,21 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) { } bool GrpcExecutor::IsThreaded() const { - return gpr_atm_no_barrier_load(&num_threads_) > 0; + return gpr_atm_acq_load(&num_threads_) > 0; } void GrpcExecutor::SetThreading(bool threading) { - gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); + gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_); + EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading); if (threading) { - if (curr_num_threads > 0) return; + if (curr_num_threads > 0) { + EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads == 0", name_); + return; + } GPR_ASSERT(num_threads_ == 0); - gpr_atm_no_barrier_store(&num_threads_, 1); + gpr_atm_rel_store(&num_threads_, 1); gpr_tls_init(&g_this_thread_state); thd_state_ = static_cast<ThreadState*>( gpr_zalloc(sizeof(ThreadState) * max_threads_)); @@ -96,6 +106,7 @@ void GrpcExecutor::SetThreading(bool threading) { gpr_mu_init(&thd_state_[i].mu); gpr_cv_init(&thd_state_[i].cv); thd_state_[i].id = i; + thd_state_[i].name = name_; thd_state_[i].thd = grpc_core::Thread(); thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT; } @@ -104,7 +115,10 @@ void GrpcExecutor::SetThreading(bool threading) { grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); thd_state_[0].thd.Start(); } else { // !threading - if (curr_num_threads == 0) return; + if (curr_num_threads == 0) { + EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_); + return; + } for (size_t i = 0; i < max_threads_; i++) { gpr_mu_lock(&thd_state_[i].mu); @@ -121,20 +135,22 @@ void GrpcExecutor::SetThreading(bool threading) { curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); for (gpr_atm i = 0; i < curr_num_threads; i++) { thd_state_[i].thd.Join(); - EXECUTOR_TRACE(" Thread %" PRIdPTR " of %" PRIdPTR " joined", i, - curr_num_threads); + EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_, + i + 1, curr_num_threads); } - gpr_atm_no_barrier_store(&num_threads_, 0); + gpr_atm_rel_store(&num_threads_, 0); for (size_t i = 0; i < max_threads_; i++) { gpr_mu_destroy(&thd_state_[i].mu); gpr_cv_destroy(&thd_state_[i].cv); - RunClosures(thd_state_[i].elems); + RunClosures(thd_state_[i].name, thd_state_[i].elems); } gpr_free(thd_state_); gpr_tls_destroy(&g_this_thread_state); } + + EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); } void GrpcExecutor::Shutdown() { SetThreading(false); } @@ -147,8 +163,8 @@ void GrpcExecutor::ThreadMain(void* arg) { size_t subtract_depth = 0; for (;;) { - EXECUTOR_TRACE("[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", ts->id, - subtract_depth); + EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", + ts->name, ts->id, subtract_depth); gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; @@ -159,7 +175,7 @@ void GrpcExecutor::ThreadMain(void* arg) { } if (ts->shutdown) { - EXECUTOR_TRACE("[%" PRIdPTR "]: shutdown", ts->id); + EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id); gpr_mu_unlock(&ts->mu); break; } @@ -169,10 +185,10 @@ void GrpcExecutor::ThreadMain(void* arg) { ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); - EXECUTOR_TRACE("[%" PRIdPTR "]: execute", ts->id); + EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id); grpc_core::ExecCtx::Get()->InvalidateNow(); - subtract_depth = RunClosures(closures); + subtract_depth = RunClosures(ts->name, closures); } } @@ -188,16 +204,16 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, do { retry_push = false; size_t cur_thread_count = - static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_)); + static_cast<size_t>(gpr_atm_acq_load(&num_threads_)); // If the number of threads is zero(i.e either the executor is not threaded // or already shutdown), then queue the closure on the exec context itself if (cur_thread_count == 0) { #ifndef NDEBUG - EXECUTOR_TRACE("schedule %p (created %s:%d) inline", closure, + EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure, closure->file_created, closure->line_created); #else - EXECUTOR_TRACE("schedule %p inline", closure); + EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure); #endif grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error); @@ -213,18 +229,18 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } ThreadState* orig_ts = ts; - bool try_new_thread = false; + for (;;) { #ifndef NDEBUG EXECUTOR_TRACE( - "try to schedule %p (%s) (created %s:%d) to thread " + "(%s) try to schedule %p (%s) (created %s:%d) to thread " "%" PRIdPTR, - closure, is_short ? "short" : "long", closure->file_created, + name_, closure, is_short ? "short" : "long", closure->file_created, closure->line_created, ts->id); #else - EXECUTOR_TRACE("try to schedule %p (%s) to thread %" PRIdPTR, closure, - is_short ? "short" : "long", ts->id); + EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_, + closure, is_short ? "short" : "long", ts->id); #endif gpr_mu_lock(&ts->mu); @@ -236,18 +252,22 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, size_t idx = ts->id; ts = &thd_state_[(idx + 1) % cur_thread_count]; if (ts == orig_ts) { - // We cycled through all the threads. Retry enqueue again (by creating - // a new thread) + // We cycled through all the threads. Retry enqueue again by creating + // a new thread + // + // TODO (sreek): There is a potential issue here. We are + // unconditionally setting try_new_thread to true here. What if the + // executor is shutdown OR if cur_thread_count is already equal to + // max_threads ? + // (Fortunately, this is not an issue yet (as of july 2018) because + // there is only one instance of long job in gRPC and hence we will + // not hit this code path) retry_push = true; - // TODO (sreek): What if the executor is shutdown OR if - // cur_thread_count is already equal to max_threads ? (currently - as - // of July 2018, we do not run in to this issue because there is only - // one instance of long job in gRPC. This has to be fixed soon) try_new_thread = true; break; } - continue; + continue; // Try the next thread-state } // == Found the thread state (i.e thread) to enqueue this closure! == @@ -277,13 +297,11 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) { - cur_thread_count = - static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_)); + cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_)); if (cur_thread_count < max_threads_) { - // Increment num_threads (Safe to do a no_barrier_store instead of a - // cas because we always increment num_threads under the - // 'adding_thread_lock') - gpr_atm_no_barrier_store(&num_threads_, cur_thread_count + 1); + // Increment num_threads (safe to do a store instead of a cas because we + // always increment num_threads under the 'adding_thread_lock') + gpr_atm_rel_store(&num_threads_, cur_thread_count + 1); thd_state_[cur_thread_count].thd = grpc_core::Thread( name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]); @@ -298,60 +316,118 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } while (retry_push); } -static GrpcExecutor* global_executor; +static GrpcExecutor* executors[GRPC_NUM_EXECUTORS]; -void enqueue_long(grpc_closure* closure, grpc_error* error) { - global_executor->Enqueue(closure, error, false /* is_short */); +void default_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, + true /* is_short */); } -void enqueue_short(grpc_closure* closure, grpc_error* error) { - global_executor->Enqueue(closure, error, true /* is_short */); +void default_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, + false /* is_short */); } -// Short-Job executor scheduler -static const grpc_closure_scheduler_vtable global_executor_vtable_short = { - enqueue_short, enqueue_short, "executor-short"}; -static grpc_closure_scheduler global_scheduler_short = { - &global_executor_vtable_short}; +void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, + true /* is_short */); +} -// Long-job executor scheduler -static const grpc_closure_scheduler_vtable global_executor_vtable_long = { - enqueue_long, enqueue_long, "executor-long"}; -static grpc_closure_scheduler global_scheduler_long = { - &global_executor_vtable_long}; +void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, + false /* is_short */); +} + +static const grpc_closure_scheduler_vtable + vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { + {{&default_enqueue_short, &default_enqueue_short, "def-ex-short"}, + {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, + {{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"}, + {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}}; + +static grpc_closure_scheduler + schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { + {{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]}, + {&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}}, + {{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]}, + {&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}}; // grpc_executor_init() and grpc_executor_shutdown() functions are called in the // the grpc_init() and grpc_shutdown() code paths which are protected by a // global mutex. So it is okay to assume that these functions are thread-safe void grpc_executor_init() { - if (global_executor != nullptr) { - // grpc_executor_init() already called once (and grpc_executor_shutdown() - // wasn't called) + EXECUTOR_TRACE0("grpc_executor_init() enter"); + + // Return if grpc_executor_init() is already called earlier + if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) { + GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr); return; } - global_executor = grpc_core::New<GrpcExecutor>("global-executor"); - global_executor->Init(); + executors[GRPC_DEFAULT_EXECUTOR] = + grpc_core::New<GrpcExecutor>("default-executor"); + executors[GRPC_RESOLVER_EXECUTOR] = + grpc_core::New<GrpcExecutor>("resolver-executor"); + + executors[GRPC_DEFAULT_EXECUTOR]->Init(); + executors[GRPC_RESOLVER_EXECUTOR]->Init(); + + EXECUTOR_TRACE0("grpc_executor_init() done"); +} + +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, + GrpcExecutorJobType job_type) { + return &schedulers_[executor_type][job_type]; +} + +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { + return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type); } void grpc_executor_shutdown() { - // Shutdown already called - if (global_executor == nullptr) { + EXECUTOR_TRACE0("grpc_executor_shutdown() enter"); + + // Return if grpc_executor_shutdown() is already called earlier + if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) { + GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr); return; } - global_executor->Shutdown(); - grpc_core::Delete<GrpcExecutor>(global_executor); - global_executor = nullptr; + executors[GRPC_DEFAULT_EXECUTOR]->Shutdown(); + executors[GRPC_RESOLVER_EXECUTOR]->Shutdown(); + + // Delete the executor objects. + // + // NOTE: It is important to call Shutdown() on all executors first before + // calling Delete() because it is possible for one executor (that is not + // shutdown yet) to call Enqueue() on a different executor which is already + // shutdown. This is legal and in such cases, the Enqueue() operation + // effectively "fails" and enqueues that closure on the calling thread's + // exec_ctx. + // + // By ensuring that all executors are shutdown first, we are also ensuring + // that no thread is active across all executors. + + grpc_core::Delete<GrpcExecutor>(executors[GRPC_DEFAULT_EXECUTOR]); + grpc_core::Delete<GrpcExecutor>(executors[GRPC_RESOLVER_EXECUTOR]); + executors[GRPC_DEFAULT_EXECUTOR] = nullptr; + executors[GRPC_RESOLVER_EXECUTOR] = nullptr; + + EXECUTOR_TRACE0("grpc_executor_shutdown() done"); } -bool grpc_executor_is_threaded() { return global_executor->IsThreaded(); } +bool grpc_executor_is_threaded(GrpcExecutorType executor_type) { + GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS); + return executors[executor_type]->IsThreaded(); +} -void grpc_executor_set_threading(bool enable) { - global_executor->SetThreading(enable); +bool grpc_executor_is_threaded() { + return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR); } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { - return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short - : &global_scheduler_long; +void grpc_executor_set_threading(bool enable) { + EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable); + for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { + executors[i]->SetThreading(enable); + } } diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 395fc52863..8829138c5f 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -27,7 +27,8 @@ typedef struct { gpr_mu mu; - size_t id; // For debugging purposes + size_t id; // For debugging purposes + const char* name; // Thread state name gpr_cv cv; grpc_closure_list elems; size_t depth; // Number of closures in the closure list @@ -36,7 +37,11 @@ typedef struct { grpc_core::Thread thd; } ThreadState; -typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } GrpcExecutorJobType; +typedef enum { + GRPC_EXECUTOR_SHORT = 0, + GRPC_EXECUTOR_LONG, + GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this +} GrpcExecutorJobType; class GrpcExecutor { public: @@ -58,7 +63,7 @@ class GrpcExecutor { void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short); private: - static size_t RunClosures(grpc_closure_list list); + static size_t RunClosures(const char* executor_name, grpc_closure_list list); static void ThreadMain(void* arg); const char* name_; @@ -70,14 +75,42 @@ class GrpcExecutor { // == Global executor functions == +typedef enum { + GRPC_DEFAULT_EXECUTOR = 0, + GRPC_RESOLVER_EXECUTOR, + + GRPC_NUM_EXECUTORS // Add new values above this +} GrpcExecutorType; + +// TODO(sreek): Currently we have two executors (available globally): The +// default executor and the resolver executor. +// +// Some of the functions below operate on the DEFAULT executor only while some +// operate of ALL the executors. This is a bit confusing and should be cleaned +// up in future (where we make all the following functions take executor_type +// and/or job_type) + +// Initialize ALL the executors void grpc_executor_init(); +// Shutdown ALL the executors +void grpc_executor_shutdown(); + +// Set the threading mode for ALL the executors +void grpc_executor_set_threading(bool enable); + +// Get the DEFAULT executor scheduler for the given job_type grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type); -void grpc_executor_shutdown(); +// Get the executor scheduler for a given executor_type and a job_type +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, + GrpcExecutorJobType job_type); -bool grpc_executor_is_threaded(); +// Return if a given executor is running in threaded mode (i.e if +// grpc_executor_set_threading(true) was called previously on that executor) +bool grpc_executor_is_threaded(GrpcExecutorType executor_type); -void grpc_executor_set_threading(bool enable); +// Return if the DEFAULT executor is threaded +bool grpc_executor_is_threaded(); #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 7a825643e1..c285d7eca6 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -166,8 +166,9 @@ static void posix_resolve_address(const char* name, const char* default_port, grpc_closure* on_done, grpc_resolved_addresses** addrs) { request* r = static_cast<request*>(gpr_malloc(sizeof(request))); - GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); + GRPC_CLOSURE_INIT( + &r->request_closure, do_request_thread, r, + grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 71c92615ad..3e977dca2d 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -151,8 +151,9 @@ static void windows_resolve_address(const char* name, const char* default_port, grpc_closure* on_done, grpc_resolved_addresses** addresses) { request* r = (request*)gpr_malloc(sizeof(request)); - GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); + GRPC_CLOSURE_INIT( + &r->request_closure, do_request_thread, r, + grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc b/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc index 7c4d7a71cd..8454fd7558 100644 --- a/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc +++ b/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc @@ -41,8 +41,9 @@ namespace internal { bool check_bios_data(const char* bios_data_file) { char* bios_data = read_bios_file(bios_data_file); - bool result = (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GOOGLE)) || - (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GCE)); + bool result = + bios_data && ((!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GOOGLE)) || + (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GCE))); gpr_free(bios_data); return result; } diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index cc72bb6164..59cf3a0af1 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -57,6 +57,10 @@ static const char* installed_roots_path = INSTALL_PREFIX "/share/grpc/roots.pem"; #endif +#ifndef TSI_OPENSSL_ALPN_SUPPORT +#define TSI_OPENSSL_ALPN_SUPPORT 1 +#endif + /* -- Overridden default roots. -- */ static grpc_ssl_roots_override_callback ssl_roots_override_cb = nullptr; @@ -850,7 +854,8 @@ grpc_auth_context* grpc_ssl_peer_to_auth_context(const tsi_peer* peer) { static grpc_error* ssl_check_peer(grpc_security_connector* sc, const char* peer_name, const tsi_peer* peer, grpc_auth_context** auth_context) { - /* Check the ALPN. */ +#if TSI_OPENSSL_ALPN_SUPPORT + /* Check the ALPN if ALPN is supported. */ const tsi_peer_property* p = tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL); if (p == nullptr) { @@ -861,7 +866,7 @@ static grpc_error* ssl_check_peer(grpc_security_connector* sc, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Cannot check peer: invalid ALPN value."); } - +#endif /* TSI_OPENSSL_ALPN_SUPPORT */ /* Check the peer name if specified. */ if (peer_name != nullptr && !grpc_ssl_host_matches_name(peer, peer_name)) { char* msg; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 88e015ce22..847bb87f7e 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -561,7 +561,7 @@ static void destroy_call(void* call, grpc_error* error) { } get_final_status(c, set_status_value_directly, &c->final_info.final_status, - nullptr, c->final_info.error_string); + nullptr, &(c->final_info.error_string)); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -573,6 +573,7 @@ static void destroy_call(void* call, grpc_error* error) { grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info, GRPC_CLOSURE_INIT(&c->release_call, release_call, c, grpc_schedule_on_exec_ctx)); + gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string))); } void grpc_call_ref(grpc_call* c) { gpr_ref(&c->ext_ref); } diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index ac8cec2980..e92fe2c5a1 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -25,4 +25,4 @@ const char* grpc_version_string(void) { return "6.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "gladiolus"; } +const char* grpc_g_stands_for(void) { return "glider"; } |