diff options
Diffstat (limited to 'src/core/lib/iomgr')
86 files changed, 868 insertions, 1150 deletions
diff --git a/src/core/lib/iomgr/block_annotate.h b/src/core/lib/iomgr/block_annotate.h index fcbfe9eb1a..340ebcb1af 100644 --- a/src/core/lib/iomgr/block_annotate.h +++ b/src/core/lib/iomgr/block_annotate.h @@ -19,17 +19,9 @@ #ifndef GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H #define GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H -#ifdef __cplusplus -extern "C" { -#endif - void gpr_thd_start_blocking_region(); void gpr_thd_end_blocking_region(); -#ifdef __cplusplus -} -#endif - /* These annotations identify the beginning and end of regions where the code may block for reasons other than synchronization functions. These include poll, epoll, and getaddrinfo. */ diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index 74b077de06..b5910b42e4 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -24,8 +24,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" -grpc_tracer_flag grpc_call_combiner_trace = - GRPC_TRACER_INITIALIZER(false, "call_combiner"); +grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner"); static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) { if (cancel_state & 1) { @@ -63,7 +62,7 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, grpc_error* error DEBUG_ARGS, const char* reason) { GPR_TIMER_BEGIN("call_combiner_start", 0); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR "%s] error=%s", @@ -72,7 +71,7 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, } size_t prev_size = (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, prev_size + 1); } @@ -80,13 +79,13 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, if (prev_size == 0) { GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx); GPR_TIMER_MARK("call_combiner_initiate", 0); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY"); } // Queue was empty, so execute this closure immediately. GRPC_CLOSURE_SCHED(exec_ctx, closure, error); } else { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, " QUEUING"); } // Queue was not empty, so add closure to queue. @@ -100,21 +99,21 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, grpc_call_combiner* call_combiner DEBUG_ARGS, const char* reason) { GPR_TIMER_BEGIN("call_combiner_stop", 0); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", call_combiner DEBUG_FMT_ARGS, reason); } size_t prev_size = (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, prev_size - 1); } GPR_ASSERT(prev_size >= 1); if (prev_size > 1) { while (true) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, " checking queue"); } bool empty; @@ -123,19 +122,19 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, if (closure == nullptr) { // This can happen either due to a race condition within the mpscq // code or because of a race with grpc_call_combiner_start(). - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, " queue returned no result; checking again"); } continue; } - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s", closure, grpc_error_string(closure->error_data.error)); } GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error); break; } - } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + } else if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, " queue empty"); } GPR_TIMER_END("call_combiner_stop", 0); @@ -152,7 +151,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, // If error is set, invoke the cancellation closure immediately. // Otherwise, store the new closure. if (original_error != GRPC_ERROR_NONE) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, "call_combiner=%p: scheduling notify_on_cancel callback=%p " "for pre-existing cancellation", @@ -163,7 +162,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, } else { if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, (gpr_atm)closure)) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, "call_combiner=%p: setting notify_on_cancel=%p", call_combiner, closure); } @@ -172,7 +171,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, // up any resources they may be holding for the callback. if (original_state != 0) { closure = (grpc_closure*)original_state; - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, "call_combiner=%p: scheduling old cancel callback=%p", call_combiner, closure); @@ -201,7 +200,7 @@ void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, encode_cancel_state_error(error))) { if (original_state != 0) { grpc_closure* notify_on_cancel = (grpc_closure*)original_state; - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_DEBUG, "call_combiner=%p: scheduling notify_on_cancel callback=%p", call_combiner, notify_on_cancel); diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index 527f84fce0..c07af51c91 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -27,10 +27,6 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/mpscq.h" -#ifdef __cplusplus -extern "C" { -#endif - // A simple, lock-free mechanism for serializing activity related to a // single call. This is similar to a combiner but is more lightweight. // @@ -40,7 +36,7 @@ extern "C" { // when it is done with the action that was kicked off by the original // callback. -extern grpc_tracer_flag grpc_call_combiner_trace; +extern grpc_core::TraceFlag grpc_call_combiner_trace; typedef struct { gpr_atm size; // size_t, num closures in queue or currently executing @@ -122,8 +118,4 @@ void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, grpc_call_combiner* call_combiner, grpc_error* error); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ diff --git a/src/core/lib/iomgr/closure.cc b/src/core/lib/iomgr/closure.cc deleted file mode 100644 index 09257d258c..0000000000 --- a/src/core/lib/iomgr/closure.cc +++ /dev/null @@ -1,219 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/closure.h" - -#include <assert.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> - -#include "src/core/lib/profiling/timers.h" - -#ifndef NDEBUG -grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false, "closure"); -#endif - -#ifndef NDEBUG -grpc_closure* grpc_closure_init(const char* file, int line, - grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler) { -#else -grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler) { -#endif - closure->cb = cb; - closure->cb_arg = cb_arg; - closure->scheduler = scheduler; -#ifndef NDEBUG - closure->scheduled = false; - closure->file_initiated = nullptr; - closure->line_initiated = 0; - closure->run = false; - closure->file_created = file; - closure->line_created = line; -#endif - return closure; -} - -void grpc_closure_list_init(grpc_closure_list* closure_list) { - closure_list->head = closure_list->tail = nullptr; -} - -bool grpc_closure_list_append(grpc_closure_list* closure_list, - grpc_closure* closure, grpc_error* error) { - if (closure == nullptr) { - GRPC_ERROR_UNREF(error); - return false; - } - closure->error_data.error = error; - closure->next_data.next = nullptr; - bool was_empty = (closure_list->head == nullptr); - if (was_empty) { - closure_list->head = closure; - } else { - closure_list->tail->next_data.next = closure; - } - closure_list->tail = closure; - return was_empty; -} - -void grpc_closure_list_fail_all(grpc_closure_list* list, - grpc_error* forced_failure) { - for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) { - if (c->error_data.error == GRPC_ERROR_NONE) { - c->error_data.error = GRPC_ERROR_REF(forced_failure); - } - } - GRPC_ERROR_UNREF(forced_failure); -} - -bool grpc_closure_list_empty(grpc_closure_list closure_list) { - return closure_list.head == nullptr; -} - -void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst) { - if (src->head == nullptr) { - return; - } - if (dst->head == nullptr) { - *dst = *src; - } else { - dst->tail->next_data.next = src->head; - dst->tail = src->tail; - } - src->head = src->tail = nullptr; -} - -typedef struct { - grpc_iomgr_cb_func cb; - void* cb_arg; - grpc_closure wrapper; -} wrapped_closure; - -static void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - wrapped_closure* wc = (wrapped_closure*)arg; - grpc_iomgr_cb_func cb = wc->cb; - void* cb_arg = wc->cb_arg; - gpr_free(wc); - cb(exec_ctx, cb_arg, error); -} - -#ifndef NDEBUG -grpc_closure* grpc_closure_create(const char* file, int line, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { -#else -grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { -#endif - wrapped_closure* wc = (wrapped_closure*)gpr_malloc(sizeof(*wc)); - wc->cb = cb; - wc->cb_arg = cb_arg; -#ifndef NDEBUG - grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler); -#else - grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler); -#endif - return &wc->wrapper; -} - -#ifndef NDEBUG -void grpc_closure_run(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* c, grpc_error* error) { -#else -void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c, - grpc_error* error) { -#endif - GPR_TIMER_BEGIN("grpc_closure_run", 0); - if (c != nullptr) { -#ifndef NDEBUG - c->file_initiated = file; - c->line_initiated = line; - c->run = true; -#endif - assert(c->cb); - c->scheduler->vtable->run(exec_ctx, c, error); - } else { - GRPC_ERROR_UNREF(error); - } - GPR_TIMER_END("grpc_closure_run", 0); -} - -#ifndef NDEBUG -void grpc_closure_sched(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* c, grpc_error* error) { -#else -void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c, - grpc_error* error) { -#endif - GPR_TIMER_BEGIN("grpc_closure_sched", 0); - if (c != nullptr) { -#ifndef NDEBUG - if (c->scheduled) { - gpr_log(GPR_ERROR, - "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", - c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); - abort(); - } - c->scheduled = true; - c->file_initiated = file; - c->line_initiated = line; - c->run = false; -#endif - assert(c->cb); - c->scheduler->vtable->sched(exec_ctx, c, error); - } else { - GRPC_ERROR_UNREF(error); - } - GPR_TIMER_END("grpc_closure_sched", 0); -} - -#ifndef NDEBUG -void grpc_closure_list_sched(const char* file, int line, - grpc_exec_ctx* exec_ctx, grpc_closure_list* list) { -#else -void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, grpc_closure_list* list) { -#endif - grpc_closure* c = list->head; - while (c != nullptr) { - grpc_closure* next = c->next_data.next; -#ifndef NDEBUG - if (c->scheduled) { - gpr_log(GPR_ERROR, - "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", - c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); - abort(); - } - c->scheduled = true; - c->file_initiated = file; - c->line_initiated = line; - c->run = false; -#endif - assert(c->cb); - c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); - c = next; - } - list->head = list->tail = nullptr; -} diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 8b1188e2db..46793dd2c5 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -21,21 +21,19 @@ #include <grpc/support/port_platform.h> +#include <assert.h> #include <grpc/impl/codegen/exec_ctx_fwd.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include <stdbool.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/mpscq.h" -#ifdef __cplusplus -extern "C" { -#endif - struct grpc_closure; typedef struct grpc_closure grpc_closure; -#ifndef NDEBUG -extern grpc_tracer_flag grpc_trace_closure; -#endif +extern grpc_core::DebugOnlyTraceFlag grpc_trace_closure; typedef struct grpc_closure_list { grpc_closure* head; @@ -85,8 +83,8 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void* cb_arg; - /** Scheduler to schedule against: NULL to schedule against current execution - context */ + /** Scheduler to schedule against: nullptr to schedule against current + execution context */ grpc_closure_scheduler* scheduler; /** Once queued, the result of the closure. Before then: scratch space */ @@ -107,102 +105,262 @@ struct grpc_closure { #endif }; +#ifndef NDEBUG +inline grpc_closure* grpc_closure_init(const char* file, int line, + grpc_closure* closure, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#else +inline grpc_closure* grpc_closure_init(grpc_closure* closure, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#endif + closure->cb = cb; + closure->cb_arg = cb_arg; + closure->scheduler = scheduler; +#ifndef NDEBUG + closure->scheduled = false; + closure->file_initiated = nullptr; + closure->line_initiated = 0; + closure->run = false; + closure->file_created = file; + closure->line_created = line; +#endif + return closure; +} + /** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */ #ifndef NDEBUG -grpc_closure* grpc_closure_init(const char* file, int line, - grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler) #else -grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ grpc_closure_init(closure, cb, cb_arg, scheduler) #endif +namespace closure_impl { + +typedef struct { + grpc_iomgr_cb_func cb; + void* cb_arg; + grpc_closure wrapper; +} wrapped_closure; + +inline void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + wrapped_closure* wc = (wrapped_closure*)arg; + grpc_iomgr_cb_func cb = wc->cb; + void* cb_arg = wc->cb_arg; + gpr_free(wc); + cb(exec_ctx, cb_arg, error); +} + +} // namespace closure_impl + +#ifndef NDEBUG +inline grpc_closure* grpc_closure_create(const char* file, int line, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#else +inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#endif + closure_impl::wrapped_closure* wc = + (closure_impl::wrapped_closure*)gpr_malloc(sizeof(*wc)); + wc->cb = cb; + wc->cb_arg = cb_arg; +#ifndef NDEBUG + grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper, wc, + scheduler); +#else + grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc, scheduler); +#endif + return &wc->wrapper; +} + /* Create a heap allocated closure: try to avoid except for very rare events */ #ifndef NDEBUG -grpc_closure* grpc_closure_create(const char* file, int line, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler) #else -grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ grpc_closure_create(cb, cb_arg, scheduler) #endif #define GRPC_CLOSURE_LIST_INIT \ - { NULL, NULL } + { nullptr, nullptr } -void grpc_closure_list_init(grpc_closure_list* list); +inline void grpc_closure_list_init(grpc_closure_list* closure_list) { + closure_list->head = closure_list->tail = nullptr; +} /** add \a closure to the end of \a list and set \a closure's result to \a error Returns true if \a list becomes non-empty */ -bool grpc_closure_list_append(grpc_closure_list* list, grpc_closure* closure, - grpc_error* error); +inline bool grpc_closure_list_append(grpc_closure_list* closure_list, + grpc_closure* closure, grpc_error* error) { + if (closure == nullptr) { + GRPC_ERROR_UNREF(error); + return false; + } + closure->error_data.error = error; + closure->next_data.next = nullptr; + bool was_empty = (closure_list->head == nullptr); + if (was_empty) { + closure_list->head = closure; + } else { + closure_list->tail->next_data.next = closure; + } + closure_list->tail = closure; + return was_empty; +} /** force all success bits in \a list to false */ -void grpc_closure_list_fail_all(grpc_closure_list* list, - grpc_error* forced_failure); +inline void grpc_closure_list_fail_all(grpc_closure_list* list, + grpc_error* forced_failure) { + for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) { + if (c->error_data.error == GRPC_ERROR_NONE) { + c->error_data.error = GRPC_ERROR_REF(forced_failure); + } + } + GRPC_ERROR_UNREF(forced_failure); +} /** append all closures from \a src to \a dst and empty \a src. */ -void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst); +inline void grpc_closure_list_move(grpc_closure_list* src, + grpc_closure_list* dst) { + if (src->head == nullptr) { + return; + } + if (dst->head == nullptr) { + *dst = *src; + } else { + dst->tail->next_data.next = src->head; + dst->tail = src->tail; + } + src->head = src->tail = nullptr; +} /** return whether \a list is empty. */ -bool grpc_closure_list_empty(grpc_closure_list list); +inline bool grpc_closure_list_empty(grpc_closure_list closure_list) { + return closure_list.head == nullptr; +} + +#ifndef NDEBUG +inline void grpc_closure_run(const char* file, int line, + grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#else +inline void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#endif + GPR_TIMER_BEGIN("grpc_closure_run", 0); + if (c != nullptr) { +#ifndef NDEBUG + c->file_initiated = file; + c->line_initiated = line; + c->run = true; +#endif + assert(c->cb); + c->scheduler->vtable->run(exec_ctx, c, error); + } else { + GRPC_ERROR_UNREF(error); + } + GPR_TIMER_END("grpc_closure_run", 0); +} /** Run a closure directly. Caller ensures that no locks are being held above. * Note that calling this at the end of a closure callback function itself is * by definition safe. */ #ifndef NDEBUG -void grpc_closure_run(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* closure, grpc_error* error); #define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \ grpc_closure_run(__FILE__, __LINE__, exec_ctx, closure, error) #else -void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure, - grpc_error* error); #define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \ grpc_closure_run(exec_ctx, closure, error) #endif +#ifndef NDEBUG +inline void grpc_closure_sched(const char* file, int line, + grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#else +inline void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#endif + GPR_TIMER_BEGIN("grpc_closure_sched", 0); + if (c != nullptr) { +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; +#endif + assert(c->cb); + c->scheduler->vtable->sched(exec_ctx, c, error); + } else { + GRPC_ERROR_UNREF(error); + } + GPR_TIMER_END("grpc_closure_sched", 0); +} + /** Schedule a closure to be run. Does not need to be run from a safe point. */ #ifndef NDEBUG -void grpc_closure_sched(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* closure, grpc_error* error); #define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \ grpc_closure_sched(__FILE__, __LINE__, exec_ctx, closure, error) #else -void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure, - grpc_error* error); #define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \ grpc_closure_sched(exec_ctx, closure, error) #endif +#ifndef NDEBUG +inline void grpc_closure_list_sched(const char* file, int line, + grpc_exec_ctx* exec_ctx, + grpc_closure_list* list) { +#else +inline void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, + grpc_closure_list* list) { +#endif + grpc_closure* c = list->head; + while (c != nullptr) { + grpc_closure* next = c->next_data.next; +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; +#endif + assert(c->cb); + c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); + c = next; + } + list->head = list->tail = nullptr; +} + /** Schedule all closures in a list to be run. Does not need to be run from a * safe point. */ #ifndef NDEBUG -void grpc_closure_list_sched(const char* file, int line, - grpc_exec_ctx* exec_ctx, - grpc_closure_list* closure_list); #define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \ grpc_closure_list_sched(__FILE__, __LINE__, exec_ctx, closure_list) #else -void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, - grpc_closure_list* closure_list); #define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \ grpc_closure_list_sched(exec_ctx, closure_list) #endif -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index b28ca3464c..15c009dd77 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -29,14 +29,13 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" -grpc_tracer_flag grpc_combiner_trace = - GRPC_TRACER_INITIALIZER(false, "combiner"); - -#define GRPC_COMBINER_TRACE(fn) \ - do { \ - if (GRPC_TRACER_ON(grpc_combiner_trace)) { \ - fn; \ - } \ +grpc_core::TraceFlag grpc_combiner_trace(false, "combiner"); + +#define GRPC_COMBINER_TRACE(fn) \ + do { \ + if (grpc_combiner_trace.enabled()) { \ + fn; \ + } \ } while (0) #define STATE_UNORPHANED 1 @@ -106,7 +105,7 @@ static void start_destroy(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) { #ifndef NDEBUG #define GRPC_COMBINER_DEBUG_SPAM(op, delta) \ - if (GRPC_TRACER_ON(grpc_combiner_trace)) { \ + if (grpc_combiner_trace.enabled()) { \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, \ "C:%p %s %" PRIdPTR " --> %" PRIdPTR " %s", lock, (op), \ gpr_atm_no_barrier_load(&lock->refs.count), \ diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index f8a8b9df62..0c05511331 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -26,10 +26,6 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/mpscq.h" -#ifdef __cplusplus -extern "C" { -#endif - // Provides serialized access to some resource. // Each action queued on a combiner is executed serially in a borrowed thread. // The actual thread executing actions may change over time (but there will only @@ -65,10 +61,6 @@ grpc_closure_scheduler* grpc_combiner_finally_scheduler(grpc_combiner* lock); bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx* exec_ctx); -extern grpc_tracer_flag grpc_combiner_trace; - -#ifdef __cplusplus -} -#endif +extern grpc_core::TraceFlag grpc_combiner_trace; #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 1b0a9e725e..6ab0a6591c 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -26,10 +26,6 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resource_quota.h" -#ifdef __cplusplus -extern "C" { -#endif - /* An endpoint caps a streaming channel between two communicating processes. Examples may be: a tcp socket, <stdin+stdout>, or some shared memory. */ @@ -106,8 +102,4 @@ struct grpc_endpoint { const grpc_endpoint_vtable* vtable; }; -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_H */ diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index 219eea8550..506ffc88b4 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -21,10 +21,6 @@ #include "src/core/lib/iomgr/endpoint.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef struct { grpc_endpoint* client; grpc_endpoint* server; @@ -33,8 +29,4 @@ typedef struct { grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_channel_args* args); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 581b903f1a..e6d640c106 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -37,10 +37,9 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" -#ifndef NDEBUG -grpc_tracer_flag grpc_trace_error_refcount = - GRPC_TRACER_INITIALIZER(false, "error_refcount"); -#endif +grpc_core::DebugOnlyTraceFlag grpc_trace_error_refcount(false, + "error_refcount"); +grpc_core::DebugOnlyTraceFlag grpc_trace_closure(false, "closure"); static const char* error_int_name(grpc_error_ints key) { switch (key) { @@ -130,7 +129,7 @@ bool grpc_error_is_special(grpc_error* err) { #ifndef NDEBUG grpc_error* grpc_error_ref(grpc_error* err, const char* file, int line) { if (grpc_error_is_special(err)) return err; - if (GRPC_TRACER_ON(grpc_trace_error_refcount)) { + if (grpc_trace_error_refcount.enabled()) { gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d]", err, gpr_atm_no_barrier_load(&err->atomics.refs.count), gpr_atm_no_barrier_load(&err->atomics.refs.count) + 1, file, line); @@ -183,7 +182,7 @@ static void error_destroy(grpc_error* err) { #ifndef NDEBUG void grpc_error_unref(grpc_error* err, const char* file, int line) { if (grpc_error_is_special(err)) return; - if (GRPC_TRACER_ON(grpc_trace_error_refcount)) { + if (grpc_trace_error_refcount.enabled()) { gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d]", err, gpr_atm_no_barrier_load(&err->atomics.refs.count), gpr_atm_no_barrier_load(&err->atomics.refs.count) - 1, file, line); @@ -216,7 +215,7 @@ static uint8_t get_placement(grpc_error** err, size_t size) { *err = (grpc_error*)gpr_realloc( *err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t)); #ifndef NDEBUG - if (GRPC_TRACER_ON(grpc_trace_error_refcount)) { + if (grpc_trace_error_refcount.enabled()) { if (*err != orig) { gpr_log(GPR_DEBUG, "realloc %p -> %p", orig, *err); } @@ -329,7 +328,7 @@ grpc_error* grpc_error_create(const char* file, int line, grpc_slice desc, return GRPC_ERROR_OOM; } #ifndef NDEBUG - if (GRPC_TRACER_ON(grpc_trace_error_refcount)) { + if (grpc_trace_error_refcount.enabled()) { gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line); } #endif @@ -411,7 +410,7 @@ static grpc_error* copy_error_and_unref(grpc_error* in) { out = (grpc_error*)gpr_malloc(sizeof(*in) + new_arena_capacity * sizeof(intptr_t)); #ifndef NDEBUG - if (GRPC_TRACER_ON(grpc_trace_error_refcount)) { + if (grpc_trace_error_refcount.enabled()) { gpr_log(GPR_DEBUG, "%p create copying %p", out, in); } #endif diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 8d7aea4872..4759ee0791 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -29,19 +29,13 @@ #include "src/core/lib/debug/trace.h" -#ifdef __cplusplus -extern "C" { -#endif - /// Opaque representation of an error. /// See https://github.com/grpc/grpc/blob/master/doc/core/grpc-error.md for a /// full write up of this object. typedef struct grpc_error grpc_error; -#ifndef NDEBUG -extern grpc_tracer_flag grpc_trace_error_refcount; -#endif +extern grpc_core::DebugOnlyTraceFlag grpc_trace_error_refcount; typedef enum { /// 'errno' from the operating system @@ -205,8 +199,4 @@ bool grpc_log_if_error(const char* what, grpc_error* error, const char* file, #define GRPC_LOG_IF_ERROR(what, error) \ grpc_log_if_error((what), (error), __FILE__, __LINE__) -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h index d5ccbae9e7..6cb09c2cdb 100644 --- a/src/core/lib/iomgr/error_internal.h +++ b/src/core/lib/iomgr/error_internal.h @@ -25,10 +25,6 @@ #include <grpc/support/sync.h> #include "src/core/lib/iomgr/error.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef struct grpc_linked_error grpc_linked_error; struct grpc_linked_error { @@ -62,8 +58,4 @@ struct grpc_error { bool grpc_error_is_special(struct grpc_error* err); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 918bc6f933..0dda1d924c 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -263,11 +263,13 @@ static grpc_fd* fd_create(int fd, const char* name) { if (new_fd == nullptr) { new_fd = (grpc_fd*)gpr_malloc(sizeof(grpc_fd)); + new_fd->read_closure.Init(); + new_fd->write_closure.Init(); } new_fd->fd = fd; - new_fd->read_closure.Init(); - new_fd->write_closure.Init(); + new_fd->read_closure->InitEvent(); + new_fd->write_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -276,7 +278,7 @@ static grpc_fd* fd_create(int fd, const char* name) { gpr_asprintf(&fd_name, "%s fd=%d", name, fd); grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); #ifndef NDEBUG - if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name); } #endif @@ -336,8 +338,8 @@ static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd, GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error)); grpc_iomgr_unregister_object(&fd->iomgr_object); - fd->read_closure.Destroy(); - fd->write_closure.Destroy(); + fd->read_closure->DestroyEvent(); + fd->write_closure->DestroyEvent(); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -651,7 +653,7 @@ static grpc_error* do_epoll_wait(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, GRPC_STATS_INC_POLL_EVENTS_RETURNED(exec_ctx, r); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r); } @@ -673,7 +675,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; pollset->begin_refs++; - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker); } @@ -692,7 +694,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, retry_lock_neighborhood: gpr_mu_lock(&neighborhood->mu); gpr_mu_lock(&pollset->mu); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", pollset, worker, kick_state_string(worker->state), is_reassigning); @@ -744,7 +746,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, worker->initialized_cv = true; gpr_cv_init(&worker->cv); while (worker->state == UNKICKED && !pollset->shutting_down) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d", pollset, worker, kick_state_string(worker->state), pollset->shutting_down); @@ -761,7 +763,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_exec_ctx_invalidate_now(exec_ctx); } - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d " "kicked_without_poller: %d", @@ -806,7 +808,7 @@ static bool check_neighborhood_for_available_poller( case UNKICKED: if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)inspect_worker)) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, " .. choose next poller to be %p", inspect_worker); } @@ -817,7 +819,7 @@ static bool check_neighborhood_for_available_poller( gpr_cv_signal(&inspect_worker->cv); } } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, " .. beaten to choose next poller"); } } @@ -835,7 +837,7 @@ static bool check_neighborhood_for_available_poller( } while (!found_worker && inspect_worker != inspect->root_worker); } if (!found_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect); } inspect->seen_inactive = true; @@ -857,7 +859,7 @@ static void end_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_pollset_worker* worker, grpc_pollset_worker** worker_hdl) { GPR_TIMER_BEGIN("end_worker", 0); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker); } if (worker_hdl != nullptr) *worker_hdl = nullptr; @@ -867,7 +869,7 @@ static void end_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, &exec_ctx->closure_list); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { if (worker->next != worker && worker->next->state == UNKICKED) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker); } GPR_ASSERT(worker->next->initialized_cv); @@ -921,7 +923,7 @@ static void end_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, " .. remove worker"); } if (EMPTIED == worker_remove(pollset, worker)) { @@ -993,7 +995,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, GPR_TIMER_BEGIN("pollset_kick", 0); GRPC_STATS_INC_POLLSET_KICK(exec_ctx); grpc_error* ret_err = GRPC_ERROR_NONE; - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_strvec log; gpr_strvec_init(&log); char* tmp; @@ -1026,7 +1028,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, if (root_worker == nullptr) { GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx); pollset->kicked_without_poller = true; - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. kicked_without_poller"); } goto done; @@ -1034,14 +1036,14 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_pollset_worker* next_worker = root_worker->next; if (root_worker->state == KICKED) { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. already kicked %p", root_worker); } SET_KICK_STATE(root_worker, KICKED); goto done; } else if (next_worker->state == KICKED) { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. already kicked %p", next_worker); } SET_KICK_STATE(next_worker, KICKED); @@ -1052,7 +1054,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, root_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load( &g_active_poller)) { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. kicked %p", root_worker); } SET_KICK_STATE(root_worker, KICKED); @@ -1060,7 +1062,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, goto done; } else if (next_worker->state == UNKICKED) { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. kicked %p", next_worker); } GPR_ASSERT(next_worker->initialized_cv); @@ -1069,7 +1071,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, goto done; } else if (next_worker->state == DESIGNATED_POLLER) { if (root_worker->state != DESIGNATED_POLLER) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log( GPR_ERROR, " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)", @@ -1083,7 +1085,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, goto done; } else { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker, root_worker); } @@ -1099,7 +1101,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, } } else { GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. kicked while waking up"); } goto done; @@ -1109,14 +1111,14 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, } if (specific_worker->state == KICKED) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. specific worker already kicked"); } goto done; } else if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker); } SET_KICK_STATE(specific_worker, KICKED); @@ -1124,7 +1126,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, } else if (specific_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load(&g_active_poller)) { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. kick active poller"); } SET_KICK_STATE(specific_worker, KICKED); @@ -1132,7 +1134,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, goto done; } else if (specific_worker->initialized_cv) { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. kick waiting worker"); } SET_KICK_STATE(specific_worker, KICKED); @@ -1140,7 +1142,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, goto done; } else { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, " .. kick non-waiting worker"); } SET_KICK_STATE(specific_worker, KICKED); diff --git a/src/core/lib/iomgr/ev_epoll1_linux.h b/src/core/lib/iomgr/ev_epoll1_linux.h index 3e66747f6c..9a1b96bd45 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.h +++ b/src/core/lib/iomgr/ev_epoll1_linux.h @@ -22,16 +22,8 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/port.h" -#ifdef __cplusplus -extern "C" { -#endif - // a polling engine that utilizes a singleton epoll set and turnstile polling const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index bfd2ac4326..62643df697 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -59,10 +59,8 @@ #define MAX_EPOLL_EVENTS 100 #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 -#ifndef NDEBUG -grpc_tracer_flag grpc_trace_pollable_refcount = - GRPC_TRACER_INITIALIZER(false, "pollable_refcount"); -#endif +grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, + "pollable_refcount"); /******************************************************************************* * pollable Declarations @@ -263,7 +261,7 @@ static gpr_mu fd_freelist_mu; unref_by(ec, fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -288,8 +286,8 @@ static void fd_destroy(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { fd->freelist_next = fd_freelist; fd_freelist = fd; - fd->read_closure.Destroy(); - fd->write_closure.Destroy(); + fd->read_closure->DestroyEvent(); + fd->write_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } @@ -297,7 +295,7 @@ static void fd_destroy(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { #ifndef NDEBUG static void unref_by(grpc_exec_ctx* exec_ctx, grpc_fd* fd, int n, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -342,6 +340,8 @@ static grpc_fd* fd_create(int fd, const char* name) { if (new_fd == nullptr) { new_fd = (grpc_fd*)gpr_malloc(sizeof(grpc_fd)); + new_fd->read_closure.Init(); + new_fd->write_closure.Init(); } gpr_mu_init(&new_fd->pollable_mu); @@ -349,8 +349,8 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; - new_fd->read_closure.Init(); - new_fd->write_closure.Init(); + new_fd->read_closure->InitEvent(); + new_fd->write_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -360,7 +360,7 @@ static grpc_fd* fd_create(int fd, const char* name) { gpr_asprintf(&fd_name, "%s fd=%d", name, fd); grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); #ifndef NDEBUG - if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name); } #endif @@ -483,7 +483,7 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { static pollable* pollable_ref(pollable* p) { #else static pollable* pollable_ref(pollable* p, int line, const char* reason) { - if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { + if (grpc_trace_pollable_refcount.enabled()) { int r = (int)gpr_atm_no_barrier_load(&p->refs.count); gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, "POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason); @@ -498,7 +498,7 @@ static void pollable_unref(pollable* p) { #else static void pollable_unref(pollable* p, int line, const char* reason) { if (p == nullptr) return; - if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { + if (grpc_trace_pollable_refcount.enabled()) { int r = (int)gpr_atm_no_barrier_load(&p->refs.count); gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, "POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason); @@ -516,7 +516,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { static const char* err_desc = "pollable_add_fd"; const int epfd = p->epfd; - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } @@ -558,7 +558,7 @@ static void pollset_global_shutdown(void) { /* pollset->mu must be held while calling this function */ static void pollset_maybe_finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) " "rw=%p (target:NULL) cpsc=%d (target:0)", @@ -581,14 +581,14 @@ static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx, grpc_core::mu_guard lock(&p->mu); GPR_ASSERT(specific_worker != nullptr); if (specific_worker->kicked) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); } GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); return GRPC_ERROR_NONE; } if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); } GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); @@ -597,7 +597,7 @@ static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx, } if (specific_worker == p->root_worker) { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); } specific_worker->kicked = true; @@ -606,7 +606,7 @@ static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx, } if (specific_worker->initialized_cv) { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); } specific_worker->kicked = true; @@ -621,7 +621,7 @@ static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx, static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p", pollset, specific_worker, @@ -631,7 +631,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, if (specific_worker == nullptr) { if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { if (pollset->root_worker == nullptr) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset); } GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx); @@ -657,7 +657,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next); } } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset); } GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); @@ -765,7 +765,7 @@ static grpc_error* pollable_process_events(grpc_exec_ctx* exec_ctx, struct epoll_event* ev = &pollable_obj->events[n]; void* data_ptr = ev->data.ptr; if (1 & (intptr_t)data_ptr) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr); } append_error(&error, @@ -777,7 +777,7 @@ static grpc_error* pollable_process_events(grpc_exec_ctx* exec_ctx, bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p got fd %p: cancel=%d read=%d " "write=%d", @@ -805,7 +805,7 @@ static grpc_error* pollable_epoll(grpc_exec_ctx* exec_ctx, pollable* p, grpc_millis deadline) { int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { char* desc = pollable_desc(p); gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout); gpr_free(desc); @@ -825,7 +825,7 @@ static grpc_error* pollable_epoll(grpc_exec_ctx* exec_ctx, pollable* p, if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r); } @@ -893,7 +893,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, worker->initialized_cv = true; gpr_cv_init(&worker->cv); gpr_mu_unlock(&pollset->mu); - if (GRPC_TRACER_ON(grpc_polling_trace) && + if (grpc_polling_trace.enabled() && worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, worker->pollable_obj, worker, @@ -902,18 +902,18 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, while (do_poll && worker->pollable_obj->root_worker != worker) { if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, worker->pollable_obj, worker); } do_poll = false; } else if (worker->kicked) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable_obj, worker); } do_poll = false; - } else if (GRPC_TRACER_ON(grpc_polling_trace) && + } else if (grpc_polling_trace.enabled() && worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, worker->pollable_obj, worker); @@ -984,7 +984,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, #ifndef NDEBUG WORKER_PTR->originator = gettid(); #endif - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR " kwp=%d pollable=%p", @@ -1027,7 +1027,7 @@ static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked( grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd* fd) { static const char* err_desc = "pollset_transition_pollable_from_empty_to_fd"; grpc_error* error = GRPC_ERROR_NONE; - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p add fd %p (%d); transition pollable from empty to fd", pollset, fd, fd->fd); @@ -1043,7 +1043,7 @@ static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked( grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd* and_add_fd) { static const char* err_desc = "pollset_transition_pollable_from_fd_to_multi"; grpc_error* error = GRPC_ERROR_NONE; - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log( GPR_DEBUG, "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller", @@ -1193,7 +1193,7 @@ static void pollset_set_unref(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss) { static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss, grpc_fd* fd) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd); } grpc_error* error = GRPC_ERROR_NONE; @@ -1217,7 +1217,7 @@ static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss, static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss, grpc_fd* fd) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd); } pss = pss_lock_adam(pss); @@ -1238,7 +1238,7 @@ static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss, static void pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss, grpc_pollset* ps) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps); } pss = pss_lock_adam(pss); @@ -1289,7 +1289,7 @@ static grpc_error* add_fds_to_pollsets(grpc_exec_ctx* exec_ctx, grpc_fd** fds, static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss, grpc_pollset* ps) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps); } grpc_error* error = GRPC_ERROR_NONE; @@ -1326,7 +1326,7 @@ static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, grpc_pollset_set* a, grpc_pollset_set* b) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b); } grpc_error* error = GRPC_ERROR_NONE; @@ -1360,7 +1360,7 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, if (b_size > a_size) { GPR_SWAP(grpc_pollset_set*, a, b); } - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a); } gpr_ref(&a->refs); @@ -1461,10 +1461,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( return nullptr; } -#ifndef NDEBUG - grpc_register_tracer(&grpc_trace_pollable_refcount); -#endif - fd_global_init(); if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { diff --git a/src/core/lib/iomgr/ev_epollex_linux.h b/src/core/lib/iomgr/ev_epollex_linux.h index 22b536c7d4..ffa7fc7f32 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.h +++ b/src/core/lib/iomgr/ev_epollex_linux.h @@ -22,15 +22,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/port.h" -#ifdef __cplusplus -extern "C" { -#endif - const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLEX_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 5b4a7ba19c..12c8483b8e 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -54,9 +54,9 @@ #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1) -#define GRPC_POLLING_TRACE(...) \ - if (GRPC_TRACER_ON(grpc_polling_trace)) { \ - gpr_log(GPR_INFO, __VA_ARGS__); \ +#define GRPC_POLLING_TRACE(...) \ + if (grpc_polling_trace.enabled()) { \ + gpr_log(GPR_INFO, __VA_ARGS__); \ } static int grpc_wakeup_signal = -1; @@ -289,7 +289,7 @@ static void pi_unref(grpc_exec_ctx* exec_ctx, polling_island* pi); #ifndef NDEBUG static void pi_add_ref_dbg(polling_island* pi, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count); gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR @@ -301,7 +301,7 @@ static void pi_add_ref_dbg(polling_island* pi, const char* reason, static void pi_unref_dbg(grpc_exec_ctx* exec_ctx, polling_island* pi, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count); gpr_log(GPR_DEBUG, "Unref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR @@ -733,7 +733,7 @@ static gpr_mu fd_freelist_mu; #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -750,7 +750,7 @@ static void ref_by(grpc_fd* fd, int n) { #ifndef NDEBUG static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -767,8 +767,8 @@ static void unref_by(grpc_fd* fd, int n) { fd_freelist = fd; grpc_iomgr_unregister_object(&fd->iomgr_object); - fd->read_closure.Destroy(); - fd->write_closure.Destroy(); + fd->read_closure->DestroyEvent(); + fd->write_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -819,6 +819,8 @@ static grpc_fd* fd_create(int fd, const char* name) { if (new_fd == nullptr) { new_fd = (grpc_fd*)gpr_malloc(sizeof(grpc_fd)); gpr_mu_init(&new_fd->po.mu); + new_fd->read_closure.Init(); + new_fd->write_closure.Init(); } /* Note: It is not really needed to get the new_fd->po.mu lock here. If this @@ -833,8 +835,8 @@ static grpc_fd* fd_create(int fd, const char* name) { gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; new_fd->orphaned = false; - new_fd->read_closure.Init(); - new_fd->write_closure.Init(); + new_fd->read_closure->InitEvent(); + new_fd->write_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; diff --git a/src/core/lib/iomgr/ev_epollsig_linux.h b/src/core/lib/iomgr/ev_epollsig_linux.h index ca68595734..5b8aba9d9f 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.h +++ b/src/core/lib/iomgr/ev_epollsig_linux.h @@ -22,10 +22,6 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/port.h" -#ifdef __cplusplus -extern "C" { -#endif - const grpc_event_engine_vtable* grpc_init_epollsig_linux(bool explicit_request); #ifdef GRPC_LINUX_EPOLL @@ -34,8 +30,4 @@ void* grpc_pollset_get_polling_island(grpc_pollset* ps); bool grpc_are_polling_islands_equal(void* p, void* q); #endif /* defined(GRPC_LINUX_EPOLL) */ -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLSIG_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index f8b9629462..e32e1ba42a 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -288,7 +288,7 @@ cv_fd_table g_cvfds; #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -305,7 +305,7 @@ static void ref_by(grpc_fd* fd, int n) { #ifndef NDEBUG static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -992,7 +992,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, r = grpc_poll_function(pfds, pfd_count, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); } @@ -1016,7 +1016,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset); } work_combine_error( @@ -1026,7 +1026,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, if (watchers[i].fd == nullptr) { fd_end_poll(exec_ctx, &watchers[i], 0, 0, nullptr); } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset, pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); @@ -1382,7 +1382,7 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) { gpr_thd_options opt = gpr_thd_options_default(); gpr_ref(&g_cvfds.pollcount); gpr_thd_options_set_detached(&opt); - GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt)); + GPR_ASSERT(gpr_thd_new(&t_id, "grpc_poller", &run_poll, pargs, &opt)); return pargs; } diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h index 626e95bc8f..f6bc624d4f 100644 --- a/src/core/lib/iomgr/ev_poll_posix.h +++ b/src/core/lib/iomgr/ev_poll_posix.h @@ -21,15 +21,7 @@ #include "src/core/lib/iomgr/ev_posix.h" -#ifdef __cplusplus -extern "C" { -#endif - const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request); const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 076d2e6bab..031c97564a 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -36,13 +36,9 @@ #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" -grpc_tracer_flag grpc_polling_trace = - GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */ - -#ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = - GRPC_TRACER_INITIALIZER(false, "fd_refcount"); -#endif +grpc_core::TraceFlag grpc_polling_trace(false, + "polling"); /* Disabled by default */ +grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount"); /** Default poll() function - a pointer so that it can be overridden by some * tests */ @@ -63,8 +59,6 @@ typedef struct { namespace { -extern "C" { - grpc_poll_function_type real_poll_function; int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) { @@ -76,7 +70,6 @@ int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) { return -1; } } -} // extern "C" const grpc_event_engine_vtable* init_non_polling(bool explicit_request) { if (!explicit_request) { @@ -153,8 +146,6 @@ const grpc_event_engine_vtable* grpc_get_event_engine_test_only() { const char* grpc_get_poll_strategy_name() { return g_poll_strategy_name; } void grpc_event_engine_init(void) { - grpc_register_tracer(&grpc_polling_trace); - char* s = gpr_getenv("GRPC_POLL_STRATEGY"); if (s == nullptr) { s = gpr_strdup("all"); diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index d719b8f3c9..16fa10ca56 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -27,11 +27,7 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#ifdef __cplusplus -extern "C" { -#endif - -extern grpc_tracer_flag grpc_polling_trace; /* Disabled by default */ +extern grpc_core::TraceFlag grpc_polling_trace; /* Disabled by default */ typedef struct grpc_fd grpc_fd; @@ -162,8 +158,4 @@ extern grpc_poll_function_type grpc_poll_function; void grpc_set_event_engine_test_only(const grpc_event_engine_vtable*); const grpc_event_engine_vtable* grpc_get_event_engine_test_only(); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_windows.cc b/src/core/lib/iomgr/ev_windows.cc index c24dfaeaf7..697697d0b0 100644 --- a/src/core/lib/iomgr/ev_windows.cc +++ b/src/core/lib/iomgr/ev_windows.cc @@ -22,7 +22,7 @@ #include "src/core/lib/debug/trace.h" -grpc_tracer_flag grpc_polling_trace = - GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */ +grpc_core::TraceFlag grpc_polling_trace(false, + "polling"); /* Disabled by default */ #endif // GRPC_WINSOCK_SOCKET diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 27c769a7ed..1777456342 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -25,9 +25,6 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/profiling/timers.h" -#define GRPC_START_TIME_UPDATE_INTERVAL 10000 -extern "C" grpc_tracer_flag grpc_timer_check_trace; - bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx* exec_ctx) { if ((exec_ctx->flags & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { if (exec_ctx->check_ready_to_finish(exec_ctx, @@ -63,7 +60,7 @@ static void exec_ctx_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_error* error) { #ifndef NDEBUG closure->scheduled = false; - if (GRPC_TRACER_ON(grpc_trace_closure)) { + if (grpc_trace_closure.enabled()) { gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]", closure, closure->file_created, closure->line_created, closure->run ? "run" : "scheduled", closure->file_initiated, @@ -72,7 +69,7 @@ static void exec_ctx_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure, #endif closure->cb(exec_ctx, closure->cb_arg, error); #ifndef NDEBUG - if (GRPC_TRACER_ON(grpc_trace_closure)) { + if (grpc_trace_closure.enabled()) { gpr_log(GPR_DEBUG, "closure %p finished", closure); } #endif @@ -107,49 +104,16 @@ static void exec_ctx_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -/* This time pair is not entirely thread-safe as store/load of tv_sec and - * tv_nsec are performed separately. However g_start_time do not need to have - * sub-second precision, so it is ok if the value of tv_nsec is off in this - * case. */ -typedef struct time_atm_pair { - gpr_atm tv_sec; - gpr_atm tv_nsec; -} time_atm_pair; - -static time_atm_pair - g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the - // last enum value in - // gpr_clock_type -static grpc_millis g_last_start_time_update; - -static gpr_timespec timespec_from_time_atm_pair(const time_atm_pair* src, - gpr_clock_type clock_type) { - gpr_timespec time; - time.tv_nsec = (int32_t)gpr_atm_no_barrier_load(&src->tv_nsec); - time.tv_sec = (int64_t)gpr_atm_no_barrier_load(&src->tv_sec); - time.clock_type = clock_type; - return time; -} - -static void time_atm_pair_store(time_atm_pair* dst, const gpr_timespec src) { - gpr_atm_no_barrier_store(&dst->tv_sec, src.tv_sec); - gpr_atm_no_barrier_store(&dst->tv_nsec, src.tv_nsec); -} +static gpr_timespec g_start_time; void grpc_exec_ctx_global_init(void) { - for (int i = 0; i < GPR_TIMESPAN; i++) { - time_atm_pair_store(&g_start_time[i], gpr_now((gpr_clock_type)i)); - } - // allows uniform treatment in conversion functions - time_atm_pair_store(&g_start_time[GPR_TIMESPAN], gpr_time_0(GPR_TIMESPAN)); + g_start_time = gpr_now(GPR_CLOCK_MONOTONIC); } void grpc_exec_ctx_global_shutdown(void) {} static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type); - ts = gpr_time_sub(ts, start_time); + ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; if (x < 0) return 0; @@ -158,9 +122,7 @@ static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { } static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type); - ts = gpr_time_sub(ts, start_time); + ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; @@ -195,41 +157,18 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, if (clock_type == GPR_TIMESPAN) { return gpr_time_from_millis(millis, GPR_TIMESPAN); } - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[clock_type], clock_type); - return gpr_time_add(start_time, gpr_time_from_millis(millis, GPR_TIMESPAN)); + return gpr_time_add(gpr_convert_clock_type(g_start_time, clock_type), + gpr_time_from_millis(millis, GPR_TIMESPAN)); } grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) { - return timespec_to_atm_round_down(ts); + return timespec_to_atm_round_down( + gpr_convert_clock_type(ts, g_start_time.clock_type)); } grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { - return timespec_to_atm_round_up(ts); -} - -void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx) { - grpc_millis now = grpc_exec_ctx_now(exec_ctx); - grpc_millis last_start_time_update = - gpr_atm_no_barrier_load(&g_last_start_time_update); - - if (now > last_start_time_update && - now - last_start_time_update > GRPC_START_TIME_UPDATE_INTERVAL) { - /* Get the current system time and subtract \a now from it, where \a now is - * the relative time from grpc_init() from monotonic clock. This calibrates - * the time when grpc_exec_ctx_global_init was called based on current - * system clock. */ - gpr_atm_no_barrier_store(&g_last_start_time_update, now); - gpr_timespec real_now = gpr_now(GPR_CLOCK_REALTIME); - gpr_timespec real_start_time = - gpr_time_sub(real_now, gpr_time_from_millis(now, GPR_TIMESPAN)); - time_atm_pair_store(&g_start_time[GPR_CLOCK_REALTIME], real_start_time); - - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "Update realtime clock start time: %" PRId64 "s %dns", - real_start_time.tv_sec, real_start_time.tv_nsec); - } - } + return timespec_to_atm_round_up( + gpr_convert_clock_type(ts, g_start_time.clock_type)); } static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 6035e08361..b415d2c255 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -24,10 +24,6 @@ #include "src/core/lib/iomgr/closure.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef gpr_atm grpc_millis; #define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX @@ -124,10 +120,4 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); -void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx); - -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 6097d66024..fabdbdf934 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -51,8 +51,7 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; GPR_TLS_DECL(g_this_thread_state); -static grpc_tracer_flag executor_trace = - GRPC_TRACER_INITIALIZER(false, "executor"); +grpc_core::TraceFlag executor_trace(false, "executor"); static void executor_thread(void* arg); @@ -63,7 +62,7 @@ static size_t run_closures(grpc_exec_ctx* exec_ctx, grpc_closure_list list) { while (c != nullptr) { grpc_closure* next = c->next_data.next; grpc_error* error = c->error_data.error; - if (GRPC_TRACER_ON(executor_trace)) { + if (executor_trace.enabled()) { #ifndef NDEBUG gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, c->file_created, c->line_created); @@ -105,8 +104,8 @@ void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool threading) { gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0], - &opt); + gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread, + &g_thread_state[0], &opt); } else { if (cur_threads == 0) return; for (size_t i = 0; i < g_max_threads; i++) { @@ -134,7 +133,6 @@ void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool threading) { } void grpc_executor_init(grpc_exec_ctx* exec_ctx) { - grpc_register_tracer(&executor_trace); gpr_atm_no_barrier_store(&g_cur_threads, 0); grpc_executor_set_threading(exec_ctx, true); } @@ -152,7 +150,7 @@ static void executor_thread(void* arg) { size_t subtract_depth = 0; for (;;) { - if (GRPC_TRACER_ON(executor_trace)) { + if (executor_trace.enabled()) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", (int)(ts - g_thread_state), subtract_depth); } @@ -163,7 +161,7 @@ static void executor_thread(void* arg) { gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } if (ts->shutdown) { - if (GRPC_TRACER_ON(executor_trace)) { + if (executor_trace.enabled()) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown", (int)(ts - g_thread_state)); } @@ -174,7 +172,7 @@ static void executor_thread(void* arg) { grpc_closure_list exec = ts->elems; ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); - if (GRPC_TRACER_ON(executor_trace)) { + if (executor_trace.enabled()) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } @@ -196,7 +194,7 @@ static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure, retry_push = false; size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); if (cur_thread_count == 0) { - if (GRPC_TRACER_ON(executor_trace)) { + if (executor_trace.enabled()) { #ifndef NDEBUG gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", closure, closure->file_created, closure->line_created); @@ -217,7 +215,7 @@ static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure, bool try_new_thread; for (;;) { - if (GRPC_TRACER_ON(executor_trace)) { + if (executor_trace.enabled()) { #ifndef NDEBUG gpr_log( GPR_DEBUG, @@ -265,8 +263,8 @@ static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure, gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, - &g_thread_state[cur_thread_count], &opt); + gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor", + executor_thread, &g_thread_state[cur_thread_count], &opt); } gpr_spinlock_unlock(&g_adding_thread_lock); } diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 8418ace06e..d349083eeb 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,10 +21,6 @@ #include "src/core/lib/iomgr/closure.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG @@ -49,8 +45,4 @@ bool grpc_executor_is_threaded(); grpc_executor_shutdown */ void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool enable); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc new file mode 100644 index 0000000000..f3cfd141b6 --- /dev/null +++ b/src/core/lib/iomgr/fork_posix.cc @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_FORK + +#include <string.h> + +#include <grpc/fork.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/timer_manager.h" +#include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/fork.h" +#include "src/core/lib/support/thd_internal.h" +#include "src/core/lib/surface/init.h" + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +void grpc_prefork() { + if (!grpc_fork_support_enabled()) { + gpr_log(GPR_ERROR, + "Fork support not enabled; try running with the " + "environment variable GRPC_ENABLE_FORK_SUPPORT=1"); + return; + } + if (grpc_is_initialized()) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_timer_manager_set_threading(false); + grpc_executor_set_threading(&exec_ctx, false); + grpc_exec_ctx_finish(&exec_ctx); + if (!gpr_await_threads( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(3, GPR_TIMESPAN)))) { + gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!"); + } + } +} + +void grpc_postfork_parent() { + if (grpc_is_initialized()) { + grpc_timer_manager_set_threading(true); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, true); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +void grpc_postfork_child() { + if (grpc_is_initialized()) { + grpc_timer_manager_set_threading(true); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, true); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +void grpc_fork_handlers_auto_register() { + if (grpc_fork_support_enabled()) { +#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK + pthread_atfork(grpc_prefork, grpc_postfork_parent, grpc_postfork_child); +#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK + } +} + +#endif // GRPC_POSIX_FORK diff --git a/src/core/lib/iomgr/fork_windows.cc b/src/core/lib/iomgr/fork_windows.cc new file mode 100644 index 0000000000..f9986f33c7 --- /dev/null +++ b/src/core/lib/iomgr/fork_windows.cc @@ -0,0 +1,39 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifndef GRPC_POSIX_FORK + +#include <grpc/fork.h> +#include <grpc/support/log.h> + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +void grpc_prefork() { gpr_log(GPR_ERROR, "Forking not supported on Windows"); } + +void grpc_postfork_parent() {} + +void grpc_postfork_child() {} + +void grpc_fork_handlers_auto_register() {} + +#endif // GRPC_POSIX_FORK diff --git a/src/core/lib/iomgr/gethostname.h b/src/core/lib/iomgr/gethostname.h index 2e65b5ffbf..9f10b4afa7 100644 --- a/src/core/lib/iomgr/gethostname.h +++ b/src/core/lib/iomgr/gethostname.h @@ -19,16 +19,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H #define GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H -#ifdef __cplusplus -extern "C" { -#endif - // Returns the hostname of the local machine. // Caller takes ownership of result. char* grpc_gethostname(); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H */ diff --git a/src/core/lib/iomgr/iocp_windows.h b/src/core/lib/iomgr/iocp_windows.h index d112c50538..0e9c3481f7 100644 --- a/src/core/lib/iomgr/iocp_windows.h +++ b/src/core/lib/iomgr/iocp_windows.h @@ -27,10 +27,6 @@ #include "src/core/lib/iomgr/socket_windows.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef enum { GRPC_IOCP_WORK_WORK, GRPC_IOCP_WORK_TIMEOUT, @@ -45,10 +41,6 @@ void grpc_iocp_flush(void); void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket*); -#ifdef __cplusplus -} -#endif - #endif #endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */ diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index d1549c8c63..2f00c0343d 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -22,10 +22,6 @@ #include <grpc/impl/codegen/exec_ctx_fwd.h> #include "src/core/lib/iomgr/port.h" -#ifdef __cplusplus -extern "C" { -#endif - /** Initializes the iomgr. */ void grpc_iomgr_init(grpc_exec_ctx* exec_ctx); @@ -36,8 +32,4 @@ void grpc_iomgr_start(grpc_exec_ctx* exec_ctx); * exec_ctx. */ void grpc_iomgr_shutdown(grpc_exec_ctx* exec_ctx); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */ diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index b818c68da0..20b3cb70d0 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -23,10 +23,6 @@ #include "src/core/lib/iomgr/iomgr.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef struct grpc_iomgr_object { char* name; struct grpc_iomgr_object* next; @@ -44,8 +40,4 @@ void grpc_iomgr_platform_shutdown(void); bool grpc_iomgr_abort_on_leaks(void); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index f5875a247e..f8f6fe2353 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -28,7 +28,6 @@ void grpc_iomgr_platform_init(void) { grpc_wakeup_fd_global_init(); grpc_event_engine_init(); - grpc_register_tracer(&grpc_tcp_trace); } void grpc_iomgr_platform_flush(void) {} diff --git a/src/core/lib/iomgr/iomgr_uv.cc b/src/core/lib/iomgr/iomgr_uv.cc index df5d23af3b..b8a10f2ae8 100644 --- a/src/core/lib/iomgr/iomgr_uv.cc +++ b/src/core/lib/iomgr/iomgr_uv.cc @@ -31,7 +31,7 @@ gpr_thd_id g_init_thread; void grpc_iomgr_platform_init(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); - grpc_register_tracer(&grpc_tcp_trace); + grpc_executor_set_threading(&exec_ctx, false); g_init_thread = gpr_thd_currentid(); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h index bc42ca8c1c..3b4daaa73b 100644 --- a/src/core/lib/iomgr/iomgr_uv.h +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -23,18 +23,10 @@ #include <grpc/support/thd.h> -#ifdef __cplusplus -extern "C" { -#endif - /* The thread ID of the thread on which grpc was initialized. Used to verify * that all calls into libuv are made on that same thread */ extern gpr_thd_id g_init_thread; -#ifdef __cplusplus -} -#endif - #ifdef GRPC_UV_THREAD_CHECK #define GRPC_UV_ASSERT_SAME_THREAD() \ GPR_ASSERT(gpr_thd_currentid() == g_init_thread) diff --git a/src/core/lib/iomgr/load_file.h b/src/core/lib/iomgr/load_file.h index 5b367c189d..a7336527ce 100644 --- a/src/core/lib/iomgr/load_file.h +++ b/src/core/lib/iomgr/load_file.h @@ -25,17 +25,9 @@ #include "src/core/lib/iomgr/error.h" -#ifdef __cplusplus -extern "C" { -#endif - /* Loads the content of a file into a slice. add_null_terminator will add a NULL terminator if non-zero. */ grpc_error* grpc_load_file(const char* filename, int add_null_terminator, grpc_slice* slice); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_LOAD_FILE_H */ diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc index 40e2ed6219..f0e798e8d8 100644 --- a/src/core/lib/iomgr/lockfree_event.cc +++ b/src/core/lib/iomgr/lockfree_event.cc @@ -22,7 +22,7 @@ #include "src/core/lib/debug/trace.h" -extern grpc_tracer_flag grpc_polling_trace; +extern grpc_core::TraceFlag grpc_polling_trace; /* 'state' holds the to call when the fd is readable or writable respectively. It can contain one of the following values: @@ -57,7 +57,9 @@ extern grpc_tracer_flag grpc_polling_trace; namespace grpc_core { -LockfreeEvent::LockfreeEvent() { +LockfreeEvent::LockfreeEvent() { InitEvent(); } + +void LockfreeEvent::InitEvent() { /* Perform an atomic store to start the state machine. Note carefully that LockfreeEvent *MAY* be used whilst in a destroyed @@ -67,7 +69,7 @@ LockfreeEvent::LockfreeEvent() { gpr_atm_no_barrier_store(&state_, kClosureNotReady); } -LockfreeEvent::~LockfreeEvent() { +void LockfreeEvent::DestroyEvent() { gpr_atm curr; do { curr = gpr_atm_no_barrier_load(&state_); @@ -86,7 +88,7 @@ LockfreeEvent::~LockfreeEvent() { void LockfreeEvent::NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure) { while (true) { gpr_atm curr = gpr_atm_no_barrier_load(&state_); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this, (void*)curr, closure); } @@ -153,7 +155,7 @@ bool LockfreeEvent::SetShutdown(grpc_exec_ctx* exec_ctx, while (true) { gpr_atm curr = gpr_atm_no_barrier_load(&state_); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "LockfreeEvent::SetShutdown: %p curr=%p err=%s", &state_, (void*)curr, grpc_error_string(shutdown_err)); } @@ -202,7 +204,7 @@ void LockfreeEvent::SetReady(grpc_exec_ctx* exec_ctx) { while (true) { gpr_atm curr = gpr_atm_no_barrier_load(&state_); - if (GRPC_TRACER_ON(grpc_polling_trace)) { + if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "LockfreeEvent::SetReady: %p curr=%p", &state_, (void*)curr); } diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h index c667dcd3bc..aec67a3399 100644 --- a/src/core/lib/iomgr/lockfree_event.h +++ b/src/core/lib/iomgr/lockfree_event.h @@ -30,11 +30,16 @@ namespace grpc_core { class LockfreeEvent { public: LockfreeEvent(); - ~LockfreeEvent(); LockfreeEvent(const LockfreeEvent&) = delete; LockfreeEvent& operator=(const LockfreeEvent&) = delete; + // These methods are used to initialize and destroy the internal state. These + // cannot be done in constructor and destructor because SetReady may be called + // when the event is destroyed and put in a freelist. + void InitEvent(); + void DestroyEvent(); + bool IsShutdown() const { return (gpr_atm_no_barrier_load(&state_) & kShutdownBit) != 0; } diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h index 867e085153..dbe579e60d 100644 --- a/src/core/lib/iomgr/polling_entity.h +++ b/src/core/lib/iomgr/polling_entity.h @@ -22,10 +22,6 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef enum grpc_pollset_tag { GRPC_POLLS_NONE, GRPC_POLLS_POLLSET, @@ -68,8 +64,5 @@ void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx* exec_ctx, void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx* exec_ctx, grpc_polling_entity* pollent, grpc_pollset_set* pss_dst); -#ifdef __cplusplus -} -#endif #endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */ diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index c99b930e8e..d5d78f3101 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -25,13 +25,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" -#ifdef __cplusplus -extern "C" { -#endif - -#ifndef NDEBUG -extern grpc_tracer_flag grpc_trace_fd_refcount; -#endif +extern grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount; /* A grpc_pollset is a set of file descriptors that a higher level item is interested in. For example: @@ -84,8 +78,4 @@ grpc_error* grpc_pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_pollset_worker* specific_worker) GRPC_MUST_USE_RESULT; -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */ diff --git a/src/core/lib/iomgr/pollset_set.h b/src/core/lib/iomgr/pollset_set.h index 0167a50a56..089c15cc94 100644 --- a/src/core/lib/iomgr/pollset_set.h +++ b/src/core/lib/iomgr/pollset_set.h @@ -21,10 +21,6 @@ #include "src/core/lib/iomgr/pollset.h" -#ifdef __cplusplus -extern "C" { -#endif - /* A grpc_pollset_set is a set of pollsets that are interested in an action. Adding a pollset to a pollset_set automatically adds any fd's (etc) that have been registered with the set_set to that pollset. @@ -48,8 +44,4 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, grpc_pollset_set* bag, grpc_pollset_set* item); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_H */ diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc index 1d54942c1d..16132f3a80 100644 --- a/src/core/lib/iomgr/pollset_uv.cc +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -34,10 +34,7 @@ #include "src/core/lib/debug/trace.h" -#ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = - GRPC_TRACER_INITIALIZER(false, "fd_refcount"); -#endif +grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount"); struct grpc_pollset { uv_timer_t* timer; diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h index 5cc9faf4ff..566c110ca6 100644 --- a/src/core/lib/iomgr/pollset_uv.h +++ b/src/core/lib/iomgr/pollset_uv.h @@ -19,17 +19,9 @@ #ifndef GRPC_CORE_LIB_IOMGR_POLLSET_UV_H #define GRPC_CORE_LIB_IOMGR_POLLSET_UV_H -#ifdef __cplusplus -extern "C" { -#endif - extern int grpc_pollset_work_run_loop; void grpc_pollset_global_init(void); void grpc_pollset_global_shutdown(void); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */ diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc index 5998b3f5bc..95dd7d7ddd 100644 --- a/src/core/lib/iomgr/pollset_windows.cc +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -30,10 +30,7 @@ #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1) -#ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = - GRPC_TRACER_INITIALIZER(false, "fd_refcount"); -#endif +grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount"); gpr_mu grpc_polling_mu; static grpc_pollset_worker* g_active_poller; diff --git a/src/core/lib/iomgr/pollset_windows.h b/src/core/lib/iomgr/pollset_windows.h index f6da9da601..93fe7d669b 100644 --- a/src/core/lib/iomgr/pollset_windows.h +++ b/src/core/lib/iomgr/pollset_windows.h @@ -26,10 +26,6 @@ #ifdef GRPC_WINSOCK_SOCKET #include "src/core/lib/iomgr/socket_windows.h" -#ifdef __cplusplus -extern "C" { -#endif - /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. A Windows "pollset" is merely a mutex used to synchronize with the IOCP, and workers are condition variables @@ -67,10 +63,6 @@ struct grpc_pollset { void grpc_pollset_global_init(void); void grpc_pollset_global_shutdown(void); -#ifdef __cplusplus -} -#endif - #endif #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 1cc6d98491..9fae8c0052 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -30,6 +30,7 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -59,6 +60,7 @@ #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_HOST_NAME_MAX 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -90,6 +92,7 @@ #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_MSG_IOVLEN_TYPE int +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -103,6 +106,7 @@ #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index 847e10f177..5105020404 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -25,10 +25,6 @@ #define GRPC_MAX_SOCKADDR_SIZE 128 -#ifdef __cplusplus -extern "C" { -#endif - typedef struct { char addr[GRPC_MAX_SOCKADDR_SIZE]; size_t len; @@ -56,8 +52,4 @@ extern grpc_error* (*grpc_blocking_resolve_address)( const char* name, const char* default_port, grpc_resolved_addresses** addresses); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */ diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 9a44fa203c..ccd8d9f379 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -31,8 +31,7 @@ #include "src/core/lib/iomgr/combiner.h" -grpc_tracer_flag grpc_resource_quota_trace = - GRPC_TRACER_INITIALIZER(false, "resource_quota"); +grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota"); #define MEMORY_USAGE_ESTIMATION_MAX 65536 @@ -293,7 +292,7 @@ static bool rq_alloc(grpc_exec_ctx* exec_ctx, while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) { gpr_mu_lock(&resource_user->mu); - if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RQ: check allocation for user %p shutdown=%" PRIdPTR " free_pool=%" PRId64, @@ -319,14 +318,14 @@ static bool rq_alloc(grpc_exec_ctx* exec_ctx, resource_user->free_pool = 0; resource_quota->free_pool -= amt; rq_update_estimate(resource_quota); - if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64 " bytes; rq_free_pool -> %" PRId64, resource_quota->name, resource_user->name, amt, resource_quota->free_pool); } - } else if (GRPC_TRACER_ON(grpc_resource_quota_trace) && + } else if (grpc_resource_quota_trace.enabled() && resource_user->free_pool >= 0) { gpr_log(GPR_DEBUG, "RQ %s %s: discard already satisfied alloc request", resource_quota->name, resource_user->name); @@ -357,7 +356,7 @@ static bool rq_reclaim_from_per_user_free_pool( resource_user->free_pool = 0; resource_quota->free_pool += amt; rq_update_estimate(resource_quota); - if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64 " bytes; rq_free_pool -> %" PRId64, @@ -381,7 +380,7 @@ static bool rq_reclaim(grpc_exec_ctx* exec_ctx, : GRPC_RULIST_RECLAIMER_BENIGN; grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list); if (resource_user == nullptr) return false; - if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RQ %s %s: initiate %s reclamation", resource_quota->name, resource_user->name, destructive ? "destructive" : "benign"); @@ -515,7 +514,7 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx* exec_ctx, void* ru, } static void ru_shutdown(grpc_exec_ctx* exec_ctx, void* ru, grpc_error* error) { - if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RU shutdown %p", ru); } grpc_resource_user* resource_user = (grpc_resource_user*)ru; @@ -813,7 +812,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx* exec_ctx, ru_ref_by(resource_user, (gpr_atm)size); resource_user->free_pool -= (int64_t)size; resource_user->outstanding_allocations += (int64_t)size; - if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, resource_user->free_pool); @@ -838,7 +837,7 @@ void grpc_resource_user_free(grpc_exec_ctx* exec_ctx, gpr_mu_lock(&resource_user->mu); bool was_zero_or_negative = resource_user->free_pool <= 0; resource_user->free_pool += (int64_t)size; - if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, resource_user->free_pool); @@ -867,7 +866,7 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx* exec_ctx, void grpc_resource_user_finish_reclamation(grpc_exec_ctx* exec_ctx, grpc_resource_user* resource_user) { - if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete", resource_user->resource_quota->name, resource_user->name); } @@ -896,10 +895,3 @@ void grpc_resource_user_alloc_slices( grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, count * length, &slice_allocator->on_allocated); } - -grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, - size_t size) { - grpc_resource_user_alloc(exec_ctx, resource_user, size, nullptr); - return ru_slice_create(resource_user, size); -} diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index fcdf9c2de5..787370307a 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -24,10 +24,6 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/exec_ctx.h" -#ifdef __cplusplus -extern "C" { -#endif - /** \file Tracks resource usage against a pool. The current implementation tracks only memory usage, but in the future @@ -65,7 +61,7 @@ extern "C" { maintain lists of users (which users arrange to leave before they are destroyed) */ -extern grpc_tracer_flag grpc_resource_quota_trace; +extern grpc_core::TraceFlag grpc_resource_quota_trace; grpc_resource_quota* grpc_resource_quota_ref_internal( grpc_resource_quota* resource_quota); @@ -154,8 +150,4 @@ grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx* exec_ctx, grpc_resource_user* resource_user, size_t size); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ diff --git a/src/core/lib/iomgr/sockaddr_utils.cc b/src/core/lib/iomgr/sockaddr_utils.cc index 3477fb52cd..0c0a2fe5b2 100644 --- a/src/core/lib/iomgr/sockaddr_utils.cc +++ b/src/core/lib/iomgr/sockaddr_utils.cc @@ -148,7 +148,7 @@ int grpc_sockaddr_to_string(char** out, grpc_resolved_address addr_normalized; char ntop_buf[INET6_ADDRSTRLEN]; const void* ip = nullptr; - int port; + int port = 0; uint32_t sin6_scope_id = 0; int ret; diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 090470d49e..e3bd51a4ad 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -21,10 +21,6 @@ #include "src/core/lib/iomgr/resolve_address.h" -#ifdef __cplusplus -extern "C" { -#endif - /* Returns true if addr is an IPv4-mapped IPv6 address within the ::ffff:0.0.0.0/96 range, or false otherwise. @@ -81,8 +77,4 @@ const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* addr); int grpc_sockaddr_get_family(const grpc_resolved_address* resolved_addr); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/socket_factory_posix.h b/src/core/lib/iomgr/socket_factory_posix.h index e8257b07c4..af57cc5b60 100644 --- a/src/core/lib/iomgr/socket_factory_posix.h +++ b/src/core/lib/iomgr/socket_factory_posix.h @@ -23,10 +23,6 @@ #include <grpc/support/sync.h> #include "src/core/lib/iomgr/resolve_address.h" -#ifdef __cplusplus -extern "C" { -#endif - /** The virtual table of grpc_socket_factory */ typedef struct { /** Replacement for socket(2) */ @@ -68,8 +64,4 @@ int grpc_socket_factory_compare(grpc_socket_factory* a, grpc_socket_factory* b); grpc_socket_factory* grpc_socket_factory_ref(grpc_socket_factory* factory); void grpc_socket_factory_unref(grpc_socket_factory* factory); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H */ diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h index b4103f7e93..0a97cf657f 100644 --- a/src/core/lib/iomgr/socket_mutator.h +++ b/src/core/lib/iomgr/socket_mutator.h @@ -24,10 +24,6 @@ #include <stdbool.h> -#ifdef __cplusplus -extern "C" { -#endif - /** The virtual table of grpc_socket_mutator */ typedef struct { /** Mutates the socket opitons of \a fd */ @@ -60,8 +56,4 @@ int grpc_socket_mutator_compare(grpc_socket_mutator* a, grpc_socket_mutator* b); grpc_socket_mutator* grpc_socket_mutator_ref(grpc_socket_mutator* mutator); void grpc_socket_mutator_unref(grpc_socket_mutator* mutator); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H */ diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h index 4816ab6be7..9fd141b6de 100644 --- a/src/core/lib/iomgr/socket_utils.h +++ b/src/core/lib/iomgr/socket_utils.h @@ -21,15 +21,7 @@ #include <stddef.h> -#ifdef __cplusplus -extern "C" { -#endif - /* A wrapper for inet_ntop on POSIX systems and InetNtop on Windows systems */ const char* grpc_inet_ntop(int af, const void* src, char* dst, size_t size); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */ diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 7a9c8139e7..77df4205ff 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -29,10 +29,6 @@ #include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_mutator.h" -#ifdef __cplusplus -extern "C" { -#endif - /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock, int cloexec); @@ -133,8 +129,4 @@ grpc_error* grpc_create_dualstack_socket_using_factory( grpc_socket_factory* factory, const grpc_resolved_address* addr, int type, int protocol, grpc_dualstack_mode* dsmode, int* newfd); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h index c3ad99d82f..04e0a89d70 100644 --- a/src/core/lib/iomgr/socket_windows.h +++ b/src/core/lib/iomgr/socket_windows.h @@ -31,10 +31,6 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_internal.h" -#ifdef __cplusplus -extern "C" { -#endif - /* This holds the data for an outstanding read or write on a socket. The mutex to protect the concurrent access to that data is the one inside the winsocket wrapper. */ @@ -114,10 +110,6 @@ void grpc_socket_become_ready(grpc_exec_ctx* exec_ctx, grpc_winsocket* winsocket, grpc_winsocket_callback_info* ci); -#ifdef __cplusplus -} -#endif - #endif #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index c18d8a9316..75e2fe0f36 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -25,10 +25,6 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resolve_address.h" -#ifdef __cplusplus -extern "C" { -#endif - /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure). @@ -41,8 +37,4 @@ void grpc_tcp_client_connect(grpc_exec_ctx* exec_ctx, grpc_closure* on_connect, const grpc_resolved_address* addr, grpc_millis deadline); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index cb0f627c94..4cb2ac49d5 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -43,7 +43,7 @@ #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" -extern grpc_tracer_flag grpc_tcp_trace; +extern grpc_core::TraceFlag grpc_tcp_trace; typedef struct { gpr_mu mu; @@ -99,7 +99,7 @@ done: static void tc_on_alarm(grpc_exec_ctx* exec_ctx, void* acp, grpc_error* error) { int done; async_connect* ac = (async_connect*)acp; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str, str); @@ -138,7 +138,7 @@ static void on_writable(grpc_exec_ctx* exec_ctx, void* acp, grpc_error* error) { GRPC_ERROR_REF(error); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: error=%s", ac->addr_str, str); @@ -317,7 +317,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx* exec_ctx, grpc_schedule_on_exec_ctx); ac->channel_args = grpc_channel_args_copy(channel_args); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting fd %p", ac->addr_str, fdobj); } @@ -334,13 +334,11 @@ done: } // overridden by api_fuzzer.c -extern "C" { void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, grpc_millis deadline) = tcp_client_connect_impl; -} void grpc_tcp_client_connect(grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep, diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h index 13d917891e..2b1fe79e90 100644 --- a/src/core/lib/iomgr/tcp_client_posix.h +++ b/src/core/lib/iomgr/tcp_client_posix.h @@ -23,16 +23,8 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/tcp_client.h" -#ifdef __cplusplus -extern "C" { -#endif - grpc_endpoint* grpc_tcp_client_create_from_fd( grpc_exec_ctx* exec_ctx, grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_client_uv.cc b/src/core/lib/iomgr/tcp_client_uv.cc index 15345c8091..5cca0c9936 100644 --- a/src/core/lib/iomgr/tcp_client_uv.cc +++ b/src/core/lib/iomgr/tcp_client_uv.cc @@ -32,7 +32,7 @@ #include "src/core/lib/iomgr/tcp_uv.h" #include "src/core/lib/iomgr/timer.h" -extern grpc_tracer_flag grpc_tcp_trace; +extern grpc_core::TraceFlag grpc_tcp_trace; typedef struct grpc_uv_tcp_connect { uv_connect_t connect_req; @@ -59,7 +59,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx* exec_ctx, void* acp, grpc_error* error) { int done; grpc_uv_tcp_connect* connect = (grpc_uv_tcp_connect*)acp; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", connect->addr_name, str); @@ -147,7 +147,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx* exec_ctx, connect->connect_req.data = connect; connect->refs = 2; // One for the connect operation, one for the timer. - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", connect->addr_name); } @@ -161,13 +161,11 @@ static void tcp_client_connect_impl(grpc_exec_ctx* exec_ctx, } // overridden by api_fuzzer.c -extern "C" { void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, grpc_millis deadline) = tcp_client_connect_impl; -} void grpc_tcp_client_connect(grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep, diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc index 103e6b78de..5e30725e90 100644 --- a/src/core/lib/iomgr/tcp_client_windows.cc +++ b/src/core/lib/iomgr/tcp_client_windows.cc @@ -226,13 +226,11 @@ failure: } // overridden by api_fuzzer.c -extern "C" { void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, grpc_millis deadline) = tcp_client_connect_impl; -} void grpc_tcp_client_connect(grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep, diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index cb90933e31..d09cfca9af 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -61,7 +61,7 @@ typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; typedef size_t msg_iovlen_type; #endif -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); +grpc_core::TraceFlag grpc_tcp_trace(false, "tcp"); typedef struct { grpc_endpoint base; @@ -81,9 +81,7 @@ typedef struct { grpc_slice_buffer* incoming_buffer; grpc_slice_buffer* outgoing_buffer; - /** slice within outgoing_buffer to write next */ - size_t outgoing_slice_idx; - /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ + /** byte within outgoing_buffer->slices[0] to write next */ size_t outgoing_byte_idx; grpc_closure* read_cb; @@ -121,7 +119,7 @@ static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx, static void done_poller(grpc_exec_ctx* exec_ctx, void* bp, grpc_error* error_ignored) { backup_poller* p = (backup_poller*)bp; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p); } grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p)); @@ -131,7 +129,7 @@ static void done_poller(grpc_exec_ctx* exec_ctx, void* bp, static void run_poller(grpc_exec_ctx* exec_ctx, void* bp, grpc_error* error_ignored) { backup_poller* p = (backup_poller*)bp; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); @@ -147,18 +145,18 @@ static void run_poller(grpc_exec_ctx* exec_ctx, void* bp, gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) { gpr_mu_lock(p->pollset_mu); bool cas_ok = gpr_atm_full_cas(&g_backup_poller, (gpr_atm)p, 0); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok); } gpr_mu_unlock(p->pollset_mu); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p); } grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p), GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p, grpc_schedule_on_exec_ctx)); } else { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); } GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); @@ -169,7 +167,7 @@ static void drop_uncovered(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { backup_poller* p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller); gpr_atm old_count = gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count, (int)old_count - 1); } @@ -180,14 +178,14 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { backup_poller* p; gpr_atm old_count = gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count, 2 + (int)old_count); } if (old_count == 0) { GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx); p = (backup_poller*)gpr_zalloc(sizeof(*p) + grpc_pollset_size()); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); } grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); @@ -203,7 +201,7 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { // spin waiting for backup poller } } - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp); } grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd); @@ -213,7 +211,7 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { } static void notify_on_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp); } GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp, @@ -222,7 +220,7 @@ static void notify_on_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { } static void notify_on_write(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp); } cover_self(exec_ctx, tcp); @@ -234,7 +232,7 @@ static void notify_on_write(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error)); } drop_uncovered(exec_ctx, (grpc_tcp*)arg); @@ -311,7 +309,7 @@ static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, @@ -324,7 +322,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, @@ -355,7 +353,7 @@ static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, grpc_error* error) { grpc_closure* cb = tcp->read_cb; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; const char* str = grpc_error_string(error); @@ -451,7 +449,7 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)tcpp; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, grpc_error_string(error)); } @@ -470,13 +468,13 @@ static void tcp_continue_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { size_t target_read_size = get_target_read_size(tcp); if (tcp->incoming_buffer->length < target_read_size && tcp->incoming_buffer->count < MAX_READ_IOVEC) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp); } grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, target_read_size, 1, tcp->incoming_buffer); } else { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp); } tcp_do_read(exec_ctx, tcp); @@ -487,7 +485,7 @@ static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)arg; GPR_ASSERT(!tcp->finished_edge); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); } @@ -532,23 +530,26 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, size_t unwind_slice_idx; size_t unwind_byte_idx; + // We always start at zero, because we eagerly unref and trim the slice + // buffer as we write + size_t outgoing_slice_idx = 0; + for (;;) { sending_length = 0; - unwind_slice_idx = tcp->outgoing_slice_idx; + unwind_slice_idx = outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; - for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && + for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count && iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = GRPC_SLICE_START_PTR( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + + tcp->outgoing_buffer->slices[outgoing_slice_idx]) + tcp->outgoing_byte_idx; iov[iov_size].iov_len = - GRPC_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - + GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) - tcp->outgoing_byte_idx; sending_length += iov[iov_size].iov_len; - tcp->outgoing_slice_idx++; + outgoing_slice_idx++; tcp->outgoing_byte_idx = 0; } GPR_ASSERT(iov_size > 0); @@ -574,16 +575,25 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (sent_length < 0) { if (errno == EAGAIN) { - tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; + // unref all and forget about all slices that have been written to this + // point + for (size_t idx = 0; idx < unwind_slice_idx; ++idx) { + grpc_slice_unref_internal( + exec_ctx, grpc_slice_buffer_take_first(tcp->outgoing_buffer)); + } return false; } else if (errno == EPIPE) { *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } else { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } } @@ -593,9 +603,9 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, while (trailing > 0) { size_t slice_length; - tcp->outgoing_slice_idx--; - slice_length = GRPC_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); + outgoing_slice_idx--; + slice_length = + GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]); if (slice_length > trailing) { tcp->outgoing_byte_idx = slice_length - trailing; break; @@ -604,11 +614,13 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, } } - if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { + if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } - }; + } } static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, @@ -625,14 +637,14 @@ static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, } if (!tcp_flush(exec_ctx, tcp, &error)) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "write: delayed"); } notify_on_write(exec_ctx, tcp); } else { cb = tcp->write_cb; tcp->write_cb = nullptr; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: %s", str); } @@ -647,7 +659,7 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_tcp* tcp = (grpc_tcp*)ep; grpc_error* error = GRPC_ERROR_NONE; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { size_t i; for (i = 0; i < buf->count; i++) { @@ -672,18 +684,17 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, return; } tcp->outgoing_buffer = buf; - tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; if (!tcp_flush(exec_ctx, tcp, &error)) { TCP_REF(tcp, "write"); tcp->write_cb = cb; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "write: delayed"); } notify_on_write(exec_ctx, tcp); } else { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: %s", str); } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index ff1060b0ff..09051b7ed6 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -33,11 +33,7 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" -#ifdef __cplusplus -extern "C" { -#endif - -extern grpc_tracer_flag grpc_tcp_trace; +extern grpc_core::TraceFlag grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ @@ -57,8 +53,4 @@ int grpc_tcp_fd(grpc_endpoint* ep); void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, int* fd, grpc_closure* done); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index ef983199b8..a1757a2b3e 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -25,10 +25,6 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" -#ifdef __cplusplus -extern "C" { -#endif - /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; @@ -102,8 +98,4 @@ void grpc_tcp_server_unref(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s); void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */ diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index f84fa9751d..6fed13c6c7 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -243,7 +243,7 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* err) { addr_str = grpc_sockaddr_to_uri(&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); } diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 608fba3346..6046f257f9 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -24,10 +24,6 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_server.h" -#ifdef __cplusplus -extern "C" { -#endif - /* one listening port */ typedef struct grpc_tcp_listener { int fd; @@ -121,8 +117,4 @@ grpc_error* grpc_tcp_server_prepare_socket(int fd, /* Ruturn true if the platform supports ifaddrs */ bool grpc_tcp_server_have_ifaddrs(void); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 72443cc29e..5139760634 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -55,7 +55,7 @@ static void init_max_accept_queue_size(void) { if (fgets(buf, sizeof buf, fp)) { char* end; long i = strtol(buf, &end, 10); - if (i > 0 && i <= INT_MAX && end && *end == 0) { + if (i > 0 && i <= INT_MAX && end && *end == '\n') { n = (int)i; } } diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc index 0eed4d428f..ffadf0b1ab 100644 --- a/src/core/lib/iomgr/tcp_server_uv.cc +++ b/src/core/lib/iomgr/tcp_server_uv.cc @@ -213,7 +213,7 @@ static void finish_accept(grpc_exec_ctx* exec_ctx, grpc_tcp_listener* sp) { } else { gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err)); } - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { if (peer_name_string) { gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s", sp->server, peer_name_string); @@ -247,7 +247,7 @@ static void on_connect(uv_stream_t* server, int status) { GPR_ASSERT(!sp->has_pending_connection); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); } @@ -260,15 +260,36 @@ static void on_connect(uv_stream_t* server, int status) { grpc_exec_ctx_finish(&exec_ctx); } -static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle, - const grpc_resolved_address* addr, - unsigned port_index, - grpc_tcp_listener** listener) { +static grpc_error* add_addr_to_server(grpc_tcp_server* s, + const grpc_resolved_address* addr, + unsigned port_index, + grpc_tcp_listener** listener) { grpc_tcp_listener* sp = NULL; int port = -1; int status; grpc_error* error; grpc_resolved_address sockname_temp; + uv_tcp_t* handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t)); + int family = grpc_sockaddr_get_family(addr); + + status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (family == AF_INET || family == AF_INET6) { + int fd; + uv_fileno((uv_handle_t*)handle, &fd); + int enable = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif /* GPR_LINUX && SO_REUSEPORT */ + + if (status != 0) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to initialize UV tcp handle"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); + return error; + } // The last argument to uv_tcp_bind is flags status = uv_tcp_bind(handle, (struct sockaddr*)addr->addr, 0); @@ -325,20 +346,48 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle, return GRPC_ERROR_NONE; } +static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s, + unsigned port_index, + int requested_port, + grpc_tcp_listener** listener) { + grpc_resolved_address wild4; + grpc_resolved_address wild6; + grpc_tcp_listener* sp = nullptr; + grpc_tcp_listener* sp2 = nullptr; + grpc_error* v6_err = GRPC_ERROR_NONE; + grpc_error* v4_err = GRPC_ERROR_NONE; + + grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6); + /* Try listening on IPv6 first. */ + if ((v6_err = add_addr_to_server(s, &wild6, port_index, &sp)) == + GRPC_ERROR_NONE) { + *listener = sp; + return GRPC_ERROR_NONE; + } + + if ((v4_err = add_addr_to_server(s, &wild4, port_index, &sp2)) == + GRPC_ERROR_NONE) { + *listener = sp2; + return GRPC_ERROR_NONE; + } + + grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to add any wildcard listeners"); + root_err = grpc_error_add_child(root_err, v6_err); + root_err = grpc_error_add_child(root_err, v4_err); + return root_err; +} + grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s, const grpc_resolved_address* addr, int* port) { // This function is mostly copied from tcp_server_windows.c grpc_tcp_listener* sp = NULL; - uv_tcp_t* handle; grpc_resolved_address addr6_v4mapped; - grpc_resolved_address wildcard; grpc_resolved_address* allocated_addr = NULL; grpc_resolved_address sockname_temp; unsigned port_index = 0; - int status; grpc_error* error = GRPC_ERROR_NONE; - int family; GRPC_UV_ASSERT_SAME_THREAD(); @@ -367,43 +416,20 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s, } } - if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = &addr6_v4mapped; - } - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ if (grpc_sockaddr_is_wildcard(addr, port)) { - grpc_sockaddr_make_wildcard6(*port, &wildcard); - - addr = &wildcard; - } - - handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t)); - - family = grpc_sockaddr_get_family(addr); - status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); -#if defined(GPR_LINUX) && defined(SO_REUSEPORT) - if (family == AF_INET || family == AF_INET6) { - int fd; - uv_fileno((uv_handle_t*)handle, &fd); - int enable = 1; - setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); - } -#endif /* GPR_LINUX && SO_REUSEPORT */ - - if (status == 0) { - error = add_socket_to_server(s, handle, addr, port_index, &sp); + error = add_wildcard_addrs_to_server(s, port_index, *port, &sp); } else { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Failed to initialize UV tcp handle"); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - grpc_slice_from_static_string(uv_strerror(status))); + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = &addr6_v4mapped; + } + + error = add_addr_to_server(s, addr, port_index, &sp); } gpr_free(allocated_addr); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { char* port_string; grpc_sockaddr_to_string(&port_string, addr, 0); const char* str = grpc_error_string(error); @@ -435,7 +461,7 @@ void grpc_tcp_server_start(grpc_exec_ctx* exec_ctx, grpc_tcp_server* server, (void)pollsets; (void)pollset_count; GRPC_UV_ASSERT_SAME_THREAD(); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "SERVER_START %p", server); } GPR_ASSERT(on_accept_cb); diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index ac9ca4ea11..40f4006203 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -38,7 +38,7 @@ #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); +grpc_core::TraceFlag grpc_tcp_trace(false, "tcp"); typedef struct { grpc_endpoint base; @@ -52,12 +52,12 @@ typedef struct { grpc_closure* read_cb; grpc_closure* write_cb; - grpc_slice read_slice; grpc_slice_buffer* read_slices; grpc_slice_buffer* write_slices; uv_buf_t* write_buffers; grpc_resource_user* resource_user; + grpc_resource_user_slice_allocator slice_allocator; bool shutting_down; @@ -66,7 +66,6 @@ typedef struct { } grpc_tcp; static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { - grpc_slice_unref_internal(exec_ctx, tcp->read_slice); grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp->handle); gpr_free(tcp->peer_string); @@ -79,7 +78,7 @@ static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, @@ -92,7 +91,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, @@ -119,91 +118,117 @@ static void uv_close_callback(uv_handle_t* handle) { grpc_exec_ctx_finish(&exec_ctx); } -static grpc_slice alloc_read_slice(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user) { - return grpc_resource_user_slice_malloc(exec_ctx, resource_user, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE); -} - static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp* tcp = (grpc_tcp*)handle->data; (void)suggested_size; - buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slice); - buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); + /* Before calling uv_read_start, we allocate a buffer with exactly one slice + * to tcp->read_slices and wait for the callback indicating that the + * allocation was successful. So slices[0] should always exist here */ + buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[0]); + buf->len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]); grpc_exec_ctx_finish(&exec_ctx); } +static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, + grpc_error* error) { + grpc_closure* cb = tcp->read_cb; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); + size_t i; + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < tcp->read_slices->count; i++) { + char* dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); + gpr_free(dump); + } + } + tcp->read_slices = NULL; + tcp->read_cb = NULL; + GRPC_CLOSURE_RUN(exec_ctx, cb, error); +} + static void read_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - grpc_slice sub; grpc_error* error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp* tcp = (grpc_tcp*)stream->data; - grpc_closure* cb = tcp->read_cb; + grpc_slice_buffer garbage; if (nread == 0) { // Nothing happened. Wait for the next callback return; } TCP_UNREF(&exec_ctx, tcp, "read"); - tcp->read_cb = NULL; // TODO(murgatroid99): figure out what the return value here means uv_read_stop(stream); if (nread == UV_EOF) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, tcp->read_slices); } else if (nread > 0) { // Successful read - sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); - grpc_slice_buffer_add(tcp->read_slices, sub); - tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); error = GRPC_ERROR_NONE; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { - size_t i; - const char* str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "read: error=%s", str); - - for (i = 0; i < tcp->read_slices->count; i++) { - char* dump = grpc_dump_slice(tcp->read_slices->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, - dump); - gpr_free(dump); - } + if ((size_t)nread < tcp->read_slices->length) { + /* TODO(murgatroid99): Instead of discarding the unused part of the read + * buffer, reuse it as the next read buffer. */ + grpc_slice_buffer_init(&garbage); + grpc_slice_buffer_trim_end( + tcp->read_slices, tcp->read_slices->length - (size_t)nread, &garbage); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &garbage); } } else { // nread < 0: Error error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, tcp->read_slices); } - GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); + call_read_cb(&exec_ctx, tcp, error); grpc_exec_ctx_finish(&exec_ctx); } +static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp, + grpc_error* error) { + int status; + grpc_tcp* tcp = (grpc_tcp*)tcpp; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, + grpc_error_string(error)); + } + if (error == GRPC_ERROR_NONE) { + status = + uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback); + if (status != 0) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); + error = grpc_error_set_str( + error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); + } + } + if (error != GRPC_ERROR_NONE) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->read_slices); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(exec_ctx, tcp, "read"); + } + if (grpc_tcp_trace.enabled()) { + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str); + } +} + static void uv_endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_slice_buffer* read_slices, grpc_closure* cb) { grpc_tcp* tcp = (grpc_tcp*)ep; - int status; - grpc_error* error = GRPC_ERROR_NONE; GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices); TCP_REF(tcp, "read"); - // TODO(murgatroid99): figure out what the return value here means - status = - uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback); - if (status != 0) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - grpc_slice_from_static_string(uv_strerror(status))); - GRPC_CLOSURE_SCHED(exec_ctx, cb, error); - } - if (GRPC_TRACER_ON(grpc_tcp_trace)) { - const char* str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str); - } + grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + tcp->read_slices); } static void write_callback(uv_write_t* req, int status) { @@ -218,13 +243,11 @@ static void write_callback(uv_write_t* req, int status) { } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Write failed"); } - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); } gpr_free(tcp->write_buffers); - grpc_resource_user_free(&exec_ctx, tcp->resource_user, - sizeof(uv_buf_t) * tcp->write_slices->count); GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); } @@ -240,7 +263,7 @@ static void uv_endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, uv_write_t* write_req; GRPC_UV_ASSERT_SAME_THREAD(); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { size_t j; for (j = 0; j < write_slices->count; j++) { @@ -271,8 +294,6 @@ static void uv_endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * buffer_count); - grpc_resource_user_alloc(exec_ctx, tcp->resource_user, - sizeof(uv_buf_t) * buffer_count, NULL); for (i = 0; i < buffer_count; i++) { slice = &tcp->write_slices->slices[i]; buffers[i].base = (char*)GRPC_SLICE_START_PTR(*slice); @@ -320,7 +341,7 @@ static void uv_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_error* why) { grpc_tcp* tcp = (grpc_tcp*)ep; if (!tcp->shutting_down) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(why); gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str); } @@ -367,7 +388,7 @@ grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle, grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp)); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); } @@ -381,8 +402,10 @@ grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle, gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; + tcp->read_slices = NULL; tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); - tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); + grpc_resource_user_slice_allocator_init( + &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h index 708e8469e6..fd6d19049a 100644 --- a/src/core/lib/iomgr/tcp_uv.h +++ b/src/core/lib/iomgr/tcp_uv.h @@ -38,22 +38,14 @@ #include <uv.h> -extern grpc_tracer_flag grpc_tcp_trace; +extern grpc_core::TraceFlag grpc_tcp_trace; #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 -#ifdef __cplusplus -extern "C" { -#endif - grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle, grpc_resource_quota* resource_quota, char* peer_string); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_UV */ #endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index 04922b4037..33868cdc7a 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -49,7 +49,7 @@ #define GRPC_FIONBIO FIONBIO #endif -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); +grpc_core::TraceFlag grpc_tcp_trace(false, "tcp"); static grpc_error* set_non_block(SOCKET sock) { int status; @@ -124,7 +124,7 @@ static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, @@ -137,7 +137,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h index 9c7ccdf132..28287e2795 100644 --- a/src/core/lib/iomgr/tcp_windows.h +++ b/src/core/lib/iomgr/tcp_windows.h @@ -35,10 +35,6 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/socket_windows.h" -#ifdef __cplusplus -extern "C" { -#endif - /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ @@ -48,10 +44,6 @@ grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_winsocket* socket, grpc_error* grpc_tcp_prepare_socket(SOCKET sock); -#ifdef __cplusplus -} -#endif - #endif #endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */ diff --git a/src/core/lib/iomgr/time_averaged_stats.h b/src/core/lib/iomgr/time_averaged_stats.h index d38ed272b6..8745f7fa13 100644 --- a/src/core/lib/iomgr/time_averaged_stats.h +++ b/src/core/lib/iomgr/time_averaged_stats.h @@ -19,10 +19,6 @@ #ifndef GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H #define GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H -#ifdef __cplusplus -extern "C" { -#endif - /* This tracks a time-decaying weighted average. It works by collecting batches of samples and then mixing their average into a time-decaying weighted mean. It is designed for batch operations where we do many adds @@ -74,8 +70,4 @@ void grpc_time_averaged_stats_add_sample(grpc_time_averaged_stats* stats, value. */ double grpc_time_averaged_stats_update_average(grpc_time_averaged_stats* stats); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H */ diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index cd8334eceb..b9acce229e 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -32,10 +32,6 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef struct grpc_timer grpc_timer; /* Initialize *timer. When expired or canceled, closure will be called with @@ -106,8 +102,4 @@ void grpc_timer_consume_kick(void); void grpc_kick_poller(void); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TIMER_H */ diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index a4bfbcb342..fa95c43dbe 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -42,11 +42,8 @@ #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 -extern "C" { -grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); -grpc_tracer_flag grpc_timer_check_trace = - GRPC_TRACER_INITIALIZER(false, "timer_check"); -} +grpc_core::TraceFlag grpc_timer_trace(false, "timer"); +grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check"); /* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with * deadlines earlier than 'queue_deadline" cap are maintained in the heap and @@ -253,8 +250,6 @@ void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) { g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); - grpc_register_tracer(&grpc_timer_trace); - grpc_register_tracer(&grpc_timer_check_trace); for (i = 0; i < g_num_shards; i++) { timer_shard* shard = &g_shards[i]; @@ -339,7 +334,7 @@ void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer, timer->hash_table_next = nullptr; #endif - if (GRPC_TRACER_ON(grpc_timer_trace)) { + if (grpc_timer_trace.enabled()) { gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb); @@ -375,7 +370,7 @@ void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer, timer->heap_index = INVALID_HEAP_INDEX; list_join(&shard->list, timer); } - if (GRPC_TRACER_ON(grpc_timer_trace)) { + if (grpc_timer_trace.enabled()) { gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR " => is_first_timer=%s", @@ -397,7 +392,7 @@ void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer, grpc_timer_check. */ if (is_first_timer) { gpr_mu_lock(&g_shared_mutables.mu); - if (GRPC_TRACER_ON(grpc_timer_trace)) { + if (grpc_timer_trace.enabled()) { gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, shard->min_deadline); } @@ -427,7 +422,7 @@ void grpc_timer_cancel(grpc_exec_ctx* exec_ctx, grpc_timer* timer) { timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; gpr_mu_lock(&shard->mu); - if (GRPC_TRACER_ON(grpc_timer_trace)) { + if (grpc_timer_trace.enabled()) { gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, timer->pending ? "true" : "false"); } @@ -468,7 +463,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { saturating_add(GPR_MAX(now, shard->queue_deadline_cap), (gpr_atm)(deadline_delta * 1000.0)); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR, (int)(shard - g_shards), shard->queue_deadline_cap); } @@ -476,7 +471,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { next = timer->next; if (timer->deadline < shard->queue_deadline_cap) { - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap", timer->deadline); } @@ -493,7 +488,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) { grpc_timer* timer; for (;;) { - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s", (int)(shard - g_shards), grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false"); @@ -503,13 +498,13 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) { if (!refill_heap(shard, now)) return nullptr; } timer = grpc_timer_heap_top(&shard->heap); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, " .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR, timer->deadline, now); } if (timer->deadline > now) return nullptr; - if (GRPC_TRACER_ON(grpc_timer_trace)) { + if (grpc_timer_trace.enabled()) { gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler", timer, now - timer->deadline, timer->closure->scheduler->vtable->name); @@ -534,7 +529,7 @@ static size_t pop_timers(grpc_exec_ctx* exec_ctx, timer_shard* shard, } *new_min_deadline = compute_min_deadline(shard); gpr_mu_unlock(&shard->mu); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR, (int)(shard - g_shards), n); } @@ -558,7 +553,7 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx, gpr_mu_lock(&g_shared_mutables.mu); result = GRPC_TIMERS_CHECKED_AND_EMPTY; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR, (int)(g_shard_queue[0] - g_shards), g_shard_queue[0]->min_deadline); @@ -576,7 +571,7 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx, result = GRPC_TIMERS_FIRED; } - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, " .. result --> %d" ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR @@ -621,7 +616,7 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx, if (next != nullptr) { *next = GPR_MIN(*next, min_timer); } - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, min_timer); @@ -635,7 +630,7 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx, : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); // tracing - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { char* next_str; if (next == nullptr) { next_str = gpr_strdup("NULL"); @@ -653,7 +648,7 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx, grpc_timer_check_result r = run_some_expired_timers(exec_ctx, now, next, shutdown_error); // tracing - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { char* next_str; if (next == nullptr) { next_str = gpr_strdup("NULL"); diff --git a/src/core/lib/iomgr/timer_heap.h b/src/core/lib/iomgr/timer_heap.h index ae56e5a73e..436eef55a6 100644 --- a/src/core/lib/iomgr/timer_heap.h +++ b/src/core/lib/iomgr/timer_heap.h @@ -21,10 +21,6 @@ #include "src/core/lib/iomgr/timer.h" -#ifdef __cplusplus -extern "C" { -#endif - typedef struct { grpc_timer** timers; uint32_t timer_count; @@ -43,8 +39,4 @@ void grpc_timer_heap_pop(grpc_timer_heap* heap); int grpc_timer_heap_is_empty(grpc_timer_heap* heap); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TIMER_HEAP_H */ diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index acc40b6c9e..87ed0e05dc 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -33,7 +33,7 @@ typedef struct completed_thread { struct completed_thread* next; } completed_thread; -extern "C" grpc_tracer_flag grpc_timer_check_trace; +extern grpc_core::TraceFlag grpc_timer_check_trace; // global mutex static gpr_mu g_mu; @@ -81,7 +81,7 @@ static void start_timer_thread_and_unlock(void) { ++g_waiter_count; ++g_thread_count; gpr_mu_unlock(&g_mu); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "Spawn timer thread"); } gpr_thd_options opt = gpr_thd_options_default(); @@ -93,7 +93,7 @@ static void start_timer_thread_and_unlock(void) { // to leak through g_completed_threads and be freed in gc_completed_threads() // before "&ct->t" is written to, causing a use-after-free. gpr_mu_lock(&g_mu); - gpr_thd_new(&ct->t, timer_thread, ct, &opt); + gpr_thd_new(&ct->t, "grpc_global_timer", timer_thread, ct, &opt); gpr_mu_unlock(&g_mu); } @@ -115,7 +115,7 @@ static void run_some_timers(grpc_exec_ctx* exec_ctx) { // if there's no thread waiting with a timeout, kick an existing // waiter so that the next deadline is not missed if (!g_has_timed_waiter) { - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "kick untimed waiter"); } gpr_cv_signal(&g_cv_wait); @@ -123,7 +123,7 @@ static void run_some_timers(grpc_exec_ctx* exec_ctx) { gpr_mu_unlock(&g_mu); } // without our lock, flush the exec_ctx - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "flush exec_ctx"); } grpc_exec_ctx_flush(exec_ctx); @@ -178,7 +178,7 @@ static bool wait_until(grpc_exec_ctx* exec_ctx, grpc_millis next) { g_has_timed_waiter = true; g_timed_waiter_deadline = next; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", wait_time); @@ -188,15 +188,14 @@ static bool wait_until(grpc_exec_ctx* exec_ctx, grpc_millis next) { } } - if (GRPC_TRACER_ON(grpc_timer_check_trace) && - next == GRPC_MILLIS_INF_FUTURE) { + if (grpc_timer_check_trace.enabled() && next == GRPC_MILLIS_INF_FUTURE) { gpr_log(GPR_DEBUG, "sleep until kicked"); } gpr_cv_wait(&g_cv_wait, &g_mu, grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME)); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", my_timed_waiter_generation == g_timed_waiter_generation, g_kicked); @@ -226,10 +225,6 @@ static void timer_main_loop(grpc_exec_ctx* exec_ctx) { grpc_millis next = GRPC_MILLIS_INF_FUTURE; grpc_exec_ctx_invalidate_now(exec_ctx); - /* Calibrate g_start_time in exec_ctx.cc with a regular interval in case the - * system clock has changed */ - grpc_exec_ctx_maybe_update_start_time(exec_ctx); - // check timer state, updates next to the next time to run a check switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: @@ -245,7 +240,7 @@ static void timer_main_loop(grpc_exec_ctx* exec_ctx) { Consequently, we can just sleep forever here and be happy at some saved wakeup cycles. */ - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } next = GRPC_MILLIS_INF_FUTURE; @@ -271,7 +266,7 @@ static void timer_thread_cleanup(completed_thread* ct) { ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "End timer thread"); } } @@ -314,18 +309,18 @@ void grpc_timer_manager_init(void) { static void stop_threads(void) { gpr_mu_lock(&g_mu); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "stop timer threads: threaded=%d", g_threaded); } if (g_threaded) { g_threaded = false; gpr_cv_broadcast(&g_cv_wait); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count); } while (g_thread_count > 0) { gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count); } gc_completed_threads(); diff --git a/src/core/lib/iomgr/timer_manager.h b/src/core/lib/iomgr/timer_manager.h index 72960d6ffc..0ba502928a 100644 --- a/src/core/lib/iomgr/timer_manager.h +++ b/src/core/lib/iomgr/timer_manager.h @@ -21,10 +21,6 @@ #include <stdbool.h> -#ifdef __cplusplus -extern "C" { -#endif - /* Timer Manager tries to keep one thread waiting for the next timeout at all times */ @@ -38,8 +34,4 @@ void grpc_timer_manager_set_threading(bool enabled); * disabled */ void grpc_timer_manager_tick(void); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_TIMER_MANAGER_H */ diff --git a/src/core/lib/iomgr/timer_uv.cc b/src/core/lib/iomgr/timer_uv.cc index df40e54ae6..fac2026fa9 100644 --- a/src/core/lib/iomgr/timer_uv.cc +++ b/src/core/lib/iomgr/timer_uv.cc @@ -29,11 +29,8 @@ #include <uv.h> -extern "C" { -grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); -grpc_tracer_flag grpc_timer_check_trace = - GRPC_TRACER_INITIALIZER(false, "timer_check"); -} +grpc_core::TraceFlag grpc_timer_trace(false, "timer"); +grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check"); static void timer_close_callback(uv_handle_t* handle) { gpr_free(handle); } diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 68ab9355ca..7b7d6946b1 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -47,6 +47,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -71,14 +72,22 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + // To be scheduled on another thread to actually read/write. + grpc_closure do_read_closure; + grpc_closure do_write_closure; + grpc_closure notify_on_write_closure; // True if orphan_cb is trigered. bool orphan_notified; + // True if grpc_fd_notify_on_write() is called after on_write() call. + bool notify_on_write_armed; + // True if fd has been shutdown. + bool already_shutdown; struct grpc_udp_listener* next; }; struct shutdown_fd_args { - grpc_fd* fd; + grpc_udp_listener* sp; gpr_mu* server_mu; }; @@ -144,8 +153,17 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args, grpc_error* error) { struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args; + grpc_udp_listener* sp = shutdown_args->sp; + gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd); gpr_mu_lock(shutdown_args->server_mu); - grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error)); + grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error)); + sp->already_shutdown = true; + if (!sp->notify_on_write_armed) { + // Re-arm write notification to notify listener with error. This is + // necessary to decrement active_ports. + sp->notify_on_write_armed = true; + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + } gpr_mu_unlock(shutdown_args->server_mu); gpr_free(shutdown_args); } @@ -161,6 +179,7 @@ static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { gpr_mu_destroy(&s->mu); + gpr_log(GPR_DEBUG, "Destroy all listeners."); while (s->head) { grpc_udp_listener* sp = s->head; s->head = sp->next; @@ -207,9 +226,10 @@ static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { /* Call the orphan_cb to signal that the FD is about to be closed and * should no longer be used. Because at this point, all listening ports * have been shutdown already, no need to shutdown again.*/ - GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd, + GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp, grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->orphan_cb); + gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd); sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); } @@ -233,13 +253,14 @@ void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, s->shutdown_complete = on_done; + gpr_log(GPR_DEBUG, "start to destroy udp_server"); /* shutdown all fd's */ if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); struct shutdown_fd_args* args = (struct shutdown_fd_args*)gpr_malloc(sizeof(*args)); - args->fd = sp->emfd; + args->sp = sp; args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, grpc_schedule_on_exec_ctx); @@ -329,6 +350,28 @@ error: return -1; } +static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE); + /* TODO: the reason we hold server->mu here is merely to prevent fd + * shutdown while we are reading. However, it blocks do_write(). Switch to + * read lock if available. */ + gpr_mu_lock(&sp->server->mu); + /* Tell the registered callback that data is available to read. */ + if (!sp->already_shutdown && + sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) { + /* There maybe more packets to read. Schedule read_more_cb_ closure to run + * after finishing this event loop. */ + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE); + } else { + /* Finish reading all the packets, re-arm the notification event so we can + * get another chance to read. Or fd already shutdown, re-arm to get a + * notification with shutdown error. */ + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + } + gpr_mu_unlock(&sp->server->mu); +} + /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; @@ -343,13 +386,51 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } return; } - - /* Tell the registered callback that data is available to read. */ + /* Read once. If there is more data to read, off load the work to another + * thread to finish. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data); + if (sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) { + /* There maybe more packets to read. Schedule read_more_cb_ closure to run + * after finishing this event loop. */ + GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE); + } else { + /* Finish reading all the packets, re-arm the notification event so we can + * get another chance to read. Or fd already shutdown, re-arm to get a + * notification with shutdown error. */ + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + } + gpr_mu_unlock(&sp->server->mu); +} + +// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface. +void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + gpr_mu_lock(&sp->server->mu); + if (!sp->notify_on_write_armed) { + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + sp->notify_on_write_armed = true; + } + gpr_mu_unlock(&sp->server->mu); +} - /* Re-arm the notification event so we get another chance to read. */ - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); +static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + gpr_mu_lock(&(sp->server->mu)); + if (sp->already_shutdown) { + // If fd has been shutdown, don't write any more and re-arm notification. + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + } else { + sp->notify_on_write_armed = false; + /* Tell the registered callback that the socket is writeable. */ + GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper, + arg, grpc_schedule_on_exec_ctx); + sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data, + &sp->notify_on_write_closure); + } gpr_mu_unlock(&sp->server->mu); } @@ -367,12 +448,11 @@ static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { return; } - /* Tell the registered callback that the socket is writeable. */ - GPR_ASSERT(sp->write_cb); - sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data); + /* Schedule actual write in another thread. */ + GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - /* Re-arm the notification event so we get another chance to write. */ - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE); gpr_mu_unlock(&sp->server->mu); } @@ -409,6 +489,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; sp->orphan_notified = false; + sp->already_shutdown = false; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(name); @@ -533,6 +614,7 @@ void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, grpc_schedule_on_exec_ctx); + sp->notify_on_write_armed = true; grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); /* Registered for both read and write callbacks: increment active_ports diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index bca0f049fb..1bd6922de6 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -23,10 +23,6 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" -#ifdef __cplusplus -extern "C" { -#endif - /* Forward decl of struct grpc_server */ /* This is not typedef'ed to avoid a typedef-redefinition error */ struct grpc_server; @@ -34,13 +30,16 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; -/* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, +/* Called when data is available to read from the socket. + * Return true if there is more data to read from fd. */ +typedef bool (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data); -/* Called when the socket is writeable. */ +/* Called when the socket is writeable. The given closure should be scheduled + * when the socket becomes blocked next time. */ typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, - void* user_data); + void* user_data, + grpc_closure* notify_on_write_closure); /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx* exec_ctx, @@ -77,8 +76,4 @@ int grpc_udp_server_add_port(grpc_udp_server* s, void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* server, grpc_closure* on_done); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */ diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h index be3c33d9c2..1c079e6e76 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.h +++ b/src/core/lib/iomgr/unix_sockets_posix.h @@ -25,10 +25,6 @@ #include "src/core/lib/iomgr/resolve_address.h" -#ifdef __cplusplus -extern "C" { -#endif - void grpc_create_socketpair_if_unix(int sv[2]); grpc_error* grpc_resolve_unix_domain_address( @@ -42,8 +38,4 @@ void grpc_unlink_if_unix_domain_socket( char* grpc_sockaddr_to_uri_unix_if_possible( const grpc_resolved_address* resolved_addr); -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */ diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index dcd7bdb560..017e41bfa8 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -40,10 +40,6 @@ #define GRPC_FD_TO_IDX(fd) (-(fd)-1) #define GRPC_IDX_TO_FD(idx) (-(idx)-1) -#ifdef __cplusplus -extern "C" { -#endif - typedef struct cv_node { gpr_cv* cv; struct cv_node* next; @@ -68,8 +64,4 @@ typedef struct cv_fd_table { extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */ diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.h b/src/core/lib/iomgr/wakeup_fd_pipe.h index 9bbb5e2ff7..326a0c4e01 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.h +++ b/src/core/lib/iomgr/wakeup_fd_pipe.h @@ -21,14 +21,6 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#ifdef __cplusplus -extern "C" { -#endif - extern const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable; -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_PIPE_H */ diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index ae7849f98c..a9584d0d48 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -49,10 +49,6 @@ #include "src/core/lib/iomgr/error.h" -#ifdef __cplusplus -extern "C" { -#endif - void grpc_wakeup_fd_global_init(void); void grpc_wakeup_fd_global_destroy(void); @@ -95,8 +91,4 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info); * wakeup_fd_nospecial.c if no such implementation exists. */ extern const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable; -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H */ |