diff options
author | ncteisen <ncteisen@gmail.com> | 2017-11-17 16:16:06 -0800 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2017-11-17 16:17:21 -0800 |
commit | 0392f8acc33a89fe7d255e6cda0f96eb3a0ec6ab (patch) | |
tree | 51234a00cf528a1fdf8e73cad2cc459925b1673c /src/core/lib | |
parent | aa3b19741f345faa3eb3d9bdcfa9d7064e5c439c (diff) | |
parent | 90c8cf6acc698ddef1d2da3b205ad8d0014b52fa (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into tracing++
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/closure.cc | 217 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.h | 246 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.cc | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.cc | 81 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.cc | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.cc | 123 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.cc | 4 | ||||
-rw-r--r-- | src/core/lib/support/cpu_linux.cc | 10 | ||||
-rw-r--r-- | src/core/lib/surface/server.cc | 10 |
10 files changed, 299 insertions, 402 deletions
diff --git a/src/core/lib/iomgr/closure.cc b/src/core/lib/iomgr/closure.cc deleted file mode 100644 index ee4f826eb7..0000000000 --- a/src/core/lib/iomgr/closure.cc +++ /dev/null @@ -1,217 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/closure.h" - -#include <assert.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> - -#include "src/core/lib/profiling/timers.h" - -grpc_core::DebugOnlyTraceFlag grpc_trace_closure(false, "closure"); - -#ifndef NDEBUG -grpc_closure* grpc_closure_init(const char* file, int line, - grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler) { -#else -grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler) { -#endif - closure->cb = cb; - closure->cb_arg = cb_arg; - closure->scheduler = scheduler; -#ifndef NDEBUG - closure->scheduled = false; - closure->file_initiated = nullptr; - closure->line_initiated = 0; - closure->run = false; - closure->file_created = file; - closure->line_created = line; -#endif - return closure; -} - -void grpc_closure_list_init(grpc_closure_list* closure_list) { - closure_list->head = closure_list->tail = nullptr; -} - -bool grpc_closure_list_append(grpc_closure_list* closure_list, - grpc_closure* closure, grpc_error* error) { - if (closure == nullptr) { - GRPC_ERROR_UNREF(error); - return false; - } - closure->error_data.error = error; - closure->next_data.next = nullptr; - bool was_empty = (closure_list->head == nullptr); - if (was_empty) { - closure_list->head = closure; - } else { - closure_list->tail->next_data.next = closure; - } - closure_list->tail = closure; - return was_empty; -} - -void grpc_closure_list_fail_all(grpc_closure_list* list, - grpc_error* forced_failure) { - for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) { - if (c->error_data.error == GRPC_ERROR_NONE) { - c->error_data.error = GRPC_ERROR_REF(forced_failure); - } - } - GRPC_ERROR_UNREF(forced_failure); -} - -bool grpc_closure_list_empty(grpc_closure_list closure_list) { - return closure_list.head == nullptr; -} - -void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst) { - if (src->head == nullptr) { - return; - } - if (dst->head == nullptr) { - *dst = *src; - } else { - dst->tail->next_data.next = src->head; - dst->tail = src->tail; - } - src->head = src->tail = nullptr; -} - -typedef struct { - grpc_iomgr_cb_func cb; - void* cb_arg; - grpc_closure wrapper; -} wrapped_closure; - -static void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - wrapped_closure* wc = (wrapped_closure*)arg; - grpc_iomgr_cb_func cb = wc->cb; - void* cb_arg = wc->cb_arg; - gpr_free(wc); - cb(exec_ctx, cb_arg, error); -} - -#ifndef NDEBUG -grpc_closure* grpc_closure_create(const char* file, int line, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { -#else -grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { -#endif - wrapped_closure* wc = (wrapped_closure*)gpr_malloc(sizeof(*wc)); - wc->cb = cb; - wc->cb_arg = cb_arg; -#ifndef NDEBUG - grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler); -#else - grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler); -#endif - return &wc->wrapper; -} - -#ifndef NDEBUG -void grpc_closure_run(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* c, grpc_error* error) { -#else -void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c, - grpc_error* error) { -#endif - GPR_TIMER_BEGIN("grpc_closure_run", 0); - if (c != nullptr) { -#ifndef NDEBUG - c->file_initiated = file; - c->line_initiated = line; - c->run = true; -#endif - assert(c->cb); - c->scheduler->vtable->run(exec_ctx, c, error); - } else { - GRPC_ERROR_UNREF(error); - } - GPR_TIMER_END("grpc_closure_run", 0); -} - -#ifndef NDEBUG -void grpc_closure_sched(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* c, grpc_error* error) { -#else -void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c, - grpc_error* error) { -#endif - GPR_TIMER_BEGIN("grpc_closure_sched", 0); - if (c != nullptr) { -#ifndef NDEBUG - if (c->scheduled) { - gpr_log(GPR_ERROR, - "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", - c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); - abort(); - } - c->scheduled = true; - c->file_initiated = file; - c->line_initiated = line; - c->run = false; -#endif - assert(c->cb); - c->scheduler->vtable->sched(exec_ctx, c, error); - } else { - GRPC_ERROR_UNREF(error); - } - GPR_TIMER_END("grpc_closure_sched", 0); -} - -#ifndef NDEBUG -void grpc_closure_list_sched(const char* file, int line, - grpc_exec_ctx* exec_ctx, grpc_closure_list* list) { -#else -void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, grpc_closure_list* list) { -#endif - grpc_closure* c = list->head; - while (c != nullptr) { - grpc_closure* next = c->next_data.next; -#ifndef NDEBUG - if (c->scheduled) { - gpr_log(GPR_ERROR, - "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", - c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); - abort(); - } - c->scheduled = true; - c->file_initiated = file; - c->line_initiated = line; - c->run = false; -#endif - assert(c->cb); - c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); - c = next; - } - list->head = list->tail = nullptr; -} diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 5171ac1529..46793dd2c5 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -21,15 +21,15 @@ #include <grpc/support/port_platform.h> +#include <assert.h> #include <grpc/impl/codegen/exec_ctx_fwd.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include <stdbool.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/mpscq.h" -#ifdef __cplusplus -extern "C" { -#endif - struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -83,8 +83,8 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void* cb_arg; - /** Scheduler to schedule against: NULL to schedule against current execution - context */ + /** Scheduler to schedule against: nullptr to schedule against current + execution context */ grpc_closure_scheduler* scheduler; /** Once queued, the result of the closure. Before then: scratch space */ @@ -105,102 +105,262 @@ struct grpc_closure { #endif }; +#ifndef NDEBUG +inline grpc_closure* grpc_closure_init(const char* file, int line, + grpc_closure* closure, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#else +inline grpc_closure* grpc_closure_init(grpc_closure* closure, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#endif + closure->cb = cb; + closure->cb_arg = cb_arg; + closure->scheduler = scheduler; +#ifndef NDEBUG + closure->scheduled = false; + closure->file_initiated = nullptr; + closure->line_initiated = 0; + closure->run = false; + closure->file_created = file; + closure->line_created = line; +#endif + return closure; +} + /** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */ #ifndef NDEBUG -grpc_closure* grpc_closure_init(const char* file, int line, - grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler) #else -grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ grpc_closure_init(closure, cb, cb_arg, scheduler) #endif +namespace closure_impl { + +typedef struct { + grpc_iomgr_cb_func cb; + void* cb_arg; + grpc_closure wrapper; +} wrapped_closure; + +inline void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + wrapped_closure* wc = (wrapped_closure*)arg; + grpc_iomgr_cb_func cb = wc->cb; + void* cb_arg = wc->cb_arg; + gpr_free(wc); + cb(exec_ctx, cb_arg, error); +} + +} // namespace closure_impl + +#ifndef NDEBUG +inline grpc_closure* grpc_closure_create(const char* file, int line, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#else +inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#endif + closure_impl::wrapped_closure* wc = + (closure_impl::wrapped_closure*)gpr_malloc(sizeof(*wc)); + wc->cb = cb; + wc->cb_arg = cb_arg; +#ifndef NDEBUG + grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper, wc, + scheduler); +#else + grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc, scheduler); +#endif + return &wc->wrapper; +} + /* Create a heap allocated closure: try to avoid except for very rare events */ #ifndef NDEBUG -grpc_closure* grpc_closure_create(const char* file, int line, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler) #else -grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ grpc_closure_create(cb, cb_arg, scheduler) #endif #define GRPC_CLOSURE_LIST_INIT \ - { NULL, NULL } + { nullptr, nullptr } -void grpc_closure_list_init(grpc_closure_list* list); +inline void grpc_closure_list_init(grpc_closure_list* closure_list) { + closure_list->head = closure_list->tail = nullptr; +} /** add \a closure to the end of \a list and set \a closure's result to \a error Returns true if \a list becomes non-empty */ -bool grpc_closure_list_append(grpc_closure_list* list, grpc_closure* closure, - grpc_error* error); +inline bool grpc_closure_list_append(grpc_closure_list* closure_list, + grpc_closure* closure, grpc_error* error) { + if (closure == nullptr) { + GRPC_ERROR_UNREF(error); + return false; + } + closure->error_data.error = error; + closure->next_data.next = nullptr; + bool was_empty = (closure_list->head == nullptr); + if (was_empty) { + closure_list->head = closure; + } else { + closure_list->tail->next_data.next = closure; + } + closure_list->tail = closure; + return was_empty; +} /** force all success bits in \a list to false */ -void grpc_closure_list_fail_all(grpc_closure_list* list, - grpc_error* forced_failure); +inline void grpc_closure_list_fail_all(grpc_closure_list* list, + grpc_error* forced_failure) { + for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) { + if (c->error_data.error == GRPC_ERROR_NONE) { + c->error_data.error = GRPC_ERROR_REF(forced_failure); + } + } + GRPC_ERROR_UNREF(forced_failure); +} /** append all closures from \a src to \a dst and empty \a src. */ -void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst); +inline void grpc_closure_list_move(grpc_closure_list* src, + grpc_closure_list* dst) { + if (src->head == nullptr) { + return; + } + if (dst->head == nullptr) { + *dst = *src; + } else { + dst->tail->next_data.next = src->head; + dst->tail = src->tail; + } + src->head = src->tail = nullptr; +} /** return whether \a list is empty. */ -bool grpc_closure_list_empty(grpc_closure_list list); +inline bool grpc_closure_list_empty(grpc_closure_list closure_list) { + return closure_list.head == nullptr; +} + +#ifndef NDEBUG +inline void grpc_closure_run(const char* file, int line, + grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#else +inline void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#endif + GPR_TIMER_BEGIN("grpc_closure_run", 0); + if (c != nullptr) { +#ifndef NDEBUG + c->file_initiated = file; + c->line_initiated = line; + c->run = true; +#endif + assert(c->cb); + c->scheduler->vtable->run(exec_ctx, c, error); + } else { + GRPC_ERROR_UNREF(error); + } + GPR_TIMER_END("grpc_closure_run", 0); +} /** Run a closure directly. Caller ensures that no locks are being held above. * Note that calling this at the end of a closure callback function itself is * by definition safe. */ #ifndef NDEBUG -void grpc_closure_run(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* closure, grpc_error* error); #define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \ grpc_closure_run(__FILE__, __LINE__, exec_ctx, closure, error) #else -void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure, - grpc_error* error); #define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \ grpc_closure_run(exec_ctx, closure, error) #endif +#ifndef NDEBUG +inline void grpc_closure_sched(const char* file, int line, + grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#else +inline void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#endif + GPR_TIMER_BEGIN("grpc_closure_sched", 0); + if (c != nullptr) { +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; +#endif + assert(c->cb); + c->scheduler->vtable->sched(exec_ctx, c, error); + } else { + GRPC_ERROR_UNREF(error); + } + GPR_TIMER_END("grpc_closure_sched", 0); +} + /** Schedule a closure to be run. Does not need to be run from a safe point. */ #ifndef NDEBUG -void grpc_closure_sched(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* closure, grpc_error* error); #define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \ grpc_closure_sched(__FILE__, __LINE__, exec_ctx, closure, error) #else -void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure, - grpc_error* error); #define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \ grpc_closure_sched(exec_ctx, closure, error) #endif +#ifndef NDEBUG +inline void grpc_closure_list_sched(const char* file, int line, + grpc_exec_ctx* exec_ctx, + grpc_closure_list* list) { +#else +inline void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, + grpc_closure_list* list) { +#endif + grpc_closure* c = list->head; + while (c != nullptr) { + grpc_closure* next = c->next_data.next; +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; +#endif + assert(c->cb); + c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); + c = next; + } + list->head = list->tail = nullptr; +} + /** Schedule all closures in a list to be run. Does not need to be run from a * safe point. */ #ifndef NDEBUG -void grpc_closure_list_sched(const char* file, int line, - grpc_exec_ctx* exec_ctx, - grpc_closure_list* closure_list); #define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \ grpc_closure_list_sched(__FILE__, __LINE__, exec_ctx, closure_list) #else -void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, - grpc_closure_list* closure_list); #define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \ grpc_closure_list_sched(exec_ctx, closure_list) #endif -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 46432e4ff2..e6d640c106 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -39,6 +39,7 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_error_refcount(false, "error_refcount"); +grpc_core::DebugOnlyTraceFlag grpc_trace_closure(false, "closure"); static const char* error_int_name(grpc_error_ints key) { switch (key) { diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index b637da377e..1777456342 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -25,9 +25,6 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/profiling/timers.h" -#define GRPC_START_TIME_UPDATE_INTERVAL 10000 -extern grpc_core::TraceFlag grpc_timer_check_trace; - bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx* exec_ctx) { if ((exec_ctx->flags & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { if (exec_ctx->check_ready_to_finish(exec_ctx, @@ -107,49 +104,16 @@ static void exec_ctx_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -/* This time pair is not entirely thread-safe as store/load of tv_sec and - * tv_nsec are performed separately. However g_start_time do not need to have - * sub-second precision, so it is ok if the value of tv_nsec is off in this - * case. */ -typedef struct time_atm_pair { - gpr_atm tv_sec; - gpr_atm tv_nsec; -} time_atm_pair; - -static time_atm_pair - g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the - // last enum value in - // gpr_clock_type -static grpc_millis g_last_start_time_update; - -static gpr_timespec timespec_from_time_atm_pair(const time_atm_pair* src, - gpr_clock_type clock_type) { - gpr_timespec time; - time.tv_nsec = (int32_t)gpr_atm_no_barrier_load(&src->tv_nsec); - time.tv_sec = (int64_t)gpr_atm_no_barrier_load(&src->tv_sec); - time.clock_type = clock_type; - return time; -} - -static void time_atm_pair_store(time_atm_pair* dst, const gpr_timespec src) { - gpr_atm_no_barrier_store(&dst->tv_sec, src.tv_sec); - gpr_atm_no_barrier_store(&dst->tv_nsec, src.tv_nsec); -} +static gpr_timespec g_start_time; void grpc_exec_ctx_global_init(void) { - for (int i = 0; i < GPR_TIMESPAN; i++) { - time_atm_pair_store(&g_start_time[i], gpr_now((gpr_clock_type)i)); - } - // allows uniform treatment in conversion functions - time_atm_pair_store(&g_start_time[GPR_TIMESPAN], gpr_time_0(GPR_TIMESPAN)); + g_start_time = gpr_now(GPR_CLOCK_MONOTONIC); } void grpc_exec_ctx_global_shutdown(void) {} static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type); - ts = gpr_time_sub(ts, start_time); + ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; if (x < 0) return 0; @@ -158,9 +122,7 @@ static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { } static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type); - ts = gpr_time_sub(ts, start_time); + ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; @@ -195,41 +157,18 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, if (clock_type == GPR_TIMESPAN) { return gpr_time_from_millis(millis, GPR_TIMESPAN); } - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[clock_type], clock_type); - return gpr_time_add(start_time, gpr_time_from_millis(millis, GPR_TIMESPAN)); + return gpr_time_add(gpr_convert_clock_type(g_start_time, clock_type), + gpr_time_from_millis(millis, GPR_TIMESPAN)); } grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) { - return timespec_to_atm_round_down(ts); + return timespec_to_atm_round_down( + gpr_convert_clock_type(ts, g_start_time.clock_type)); } grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { - return timespec_to_atm_round_up(ts); -} - -void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx) { - grpc_millis now = grpc_exec_ctx_now(exec_ctx); - grpc_millis last_start_time_update = - gpr_atm_no_barrier_load(&g_last_start_time_update); - - if (now > last_start_time_update && - now - last_start_time_update > GRPC_START_TIME_UPDATE_INTERVAL) { - /* Get the current system time and subtract \a now from it, where \a now is - * the relative time from grpc_init() from monotonic clock. This calibrates - * the time when grpc_exec_ctx_global_init was called based on current - * system clock. */ - gpr_atm_no_barrier_store(&g_last_start_time_update, now); - gpr_timespec real_now = gpr_now(GPR_CLOCK_REALTIME); - gpr_timespec real_start_time = - gpr_time_sub(real_now, gpr_time_from_millis(now, GPR_TIMESPAN)); - time_atm_pair_store(&g_start_time[GPR_CLOCK_REALTIME], real_start_time); - - if (grpc_timer_check_trace.enabled()) { - gpr_log(GPR_DEBUG, "Update realtime clock start time: %" PRId64 "s %dns", - real_start_time.tv_sec, real_start_time.tv_nsec); - } - } + return timespec_to_atm_round_up( + gpr_convert_clock_type(ts, g_start_time.clock_type)); } static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 6035e08361..bd27506152 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -124,8 +124,6 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); -void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx); - #ifdef __cplusplus } #endif diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 40bde7ccff..ccd8d9f379 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -895,10 +895,3 @@ void grpc_resource_user_alloc_slices( grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, count * length, &slice_allocator->on_allocated); } - -grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, - size_t size) { - grpc_resource_user_alloc(exec_ctx, resource_user, size, nullptr); - return ru_slice_create(resource_user, size); -} diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index b752153fc7..40f4006203 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -52,12 +52,12 @@ typedef struct { grpc_closure* read_cb; grpc_closure* write_cb; - grpc_slice read_slice; grpc_slice_buffer* read_slices; grpc_slice_buffer* write_slices; uv_buf_t* write_buffers; grpc_resource_user* resource_user; + grpc_resource_user_slice_allocator slice_allocator; bool shutting_down; @@ -66,7 +66,6 @@ typedef struct { } grpc_tcp; static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { - grpc_slice_unref_internal(exec_ctx, tcp->read_slice); grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp->handle); gpr_free(tcp->peer_string); @@ -119,91 +118,117 @@ static void uv_close_callback(uv_handle_t* handle) { grpc_exec_ctx_finish(&exec_ctx); } -static grpc_slice alloc_read_slice(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user) { - return grpc_resource_user_slice_malloc(exec_ctx, resource_user, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE); -} - static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp* tcp = (grpc_tcp*)handle->data; (void)suggested_size; - buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slice); - buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); + /* Before calling uv_read_start, we allocate a buffer with exactly one slice + * to tcp->read_slices and wait for the callback indicating that the + * allocation was successful. So slices[0] should always exist here */ + buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[0]); + buf->len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]); grpc_exec_ctx_finish(&exec_ctx); } +static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, + grpc_error* error) { + grpc_closure* cb = tcp->read_cb; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); + size_t i; + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < tcp->read_slices->count; i++) { + char* dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); + gpr_free(dump); + } + } + tcp->read_slices = NULL; + tcp->read_cb = NULL; + GRPC_CLOSURE_RUN(exec_ctx, cb, error); +} + static void read_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - grpc_slice sub; grpc_error* error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp* tcp = (grpc_tcp*)stream->data; - grpc_closure* cb = tcp->read_cb; + grpc_slice_buffer garbage; if (nread == 0) { // Nothing happened. Wait for the next callback return; } TCP_UNREF(&exec_ctx, tcp, "read"); - tcp->read_cb = NULL; // TODO(murgatroid99): figure out what the return value here means uv_read_stop(stream); if (nread == UV_EOF) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, tcp->read_slices); } else if (nread > 0) { // Successful read - sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); - grpc_slice_buffer_add(tcp->read_slices, sub); - tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); error = GRPC_ERROR_NONE; - if (grpc_tcp_trace.enabled()) { - size_t i; - const char* str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "read: error=%s", str); - - for (i = 0; i < tcp->read_slices->count; i++) { - char* dump = grpc_dump_slice(tcp->read_slices->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, - dump); - gpr_free(dump); - } + if ((size_t)nread < tcp->read_slices->length) { + /* TODO(murgatroid99): Instead of discarding the unused part of the read + * buffer, reuse it as the next read buffer. */ + grpc_slice_buffer_init(&garbage); + grpc_slice_buffer_trim_end( + tcp->read_slices, tcp->read_slices->length - (size_t)nread, &garbage); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &garbage); } } else { // nread < 0: Error error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, tcp->read_slices); } - GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); + call_read_cb(&exec_ctx, tcp, error); grpc_exec_ctx_finish(&exec_ctx); } +static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp, + grpc_error* error) { + int status; + grpc_tcp* tcp = (grpc_tcp*)tcpp; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, + grpc_error_string(error)); + } + if (error == GRPC_ERROR_NONE) { + status = + uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback); + if (status != 0) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); + error = grpc_error_set_str( + error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); + } + } + if (error != GRPC_ERROR_NONE) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->read_slices); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(exec_ctx, tcp, "read"); + } + if (grpc_tcp_trace.enabled()) { + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str); + } +} + static void uv_endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_slice_buffer* read_slices, grpc_closure* cb) { grpc_tcp* tcp = (grpc_tcp*)ep; - int status; - grpc_error* error = GRPC_ERROR_NONE; GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices); TCP_REF(tcp, "read"); - // TODO(murgatroid99): figure out what the return value here means - status = - uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback); - if (status != 0) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - grpc_slice_from_static_string(uv_strerror(status))); - GRPC_CLOSURE_SCHED(exec_ctx, cb, error); - } - if (grpc_tcp_trace.enabled()) { - const char* str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str); - } + grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + tcp->read_slices); } static void write_callback(uv_write_t* req, int status) { @@ -223,8 +248,6 @@ static void write_callback(uv_write_t* req, int status) { gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); } gpr_free(tcp->write_buffers); - grpc_resource_user_free(&exec_ctx, tcp->resource_user, - sizeof(uv_buf_t) * tcp->write_slices->count); GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); } @@ -271,8 +294,6 @@ static void uv_endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * buffer_count); - grpc_resource_user_alloc(exec_ctx, tcp->resource_user, - sizeof(uv_buf_t) * buffer_count, NULL); for (i = 0; i < buffer_count; i++) { slice = &tcp->write_slices->slices[i]; buffers[i].base = (char*)GRPC_SLICE_START_PTR(*slice); @@ -381,8 +402,10 @@ grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle, gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; + tcp->read_slices = NULL; tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); - tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); + grpc_resource_user_slice_allocator_init( + &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 383cc6881f..dac74aea24 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -225,10 +225,6 @@ static void timer_main_loop(grpc_exec_ctx* exec_ctx) { grpc_millis next = GRPC_MILLIS_INF_FUTURE; grpc_exec_ctx_invalidate_now(exec_ctx); - /* Calibrate g_start_time in exec_ctx.cc with a regular interval in case the - * system clock has changed */ - grpc_exec_ctx_maybe_update_start_time(exec_ctx); - // check timer state, updates next to the next time to run a check switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: diff --git a/src/core/lib/support/cpu_linux.cc b/src/core/lib/support/cpu_linux.cc index 2280668442..21b1a71dc9 100644 --- a/src/core/lib/support/cpu_linux.cc +++ b/src/core/lib/support/cpu_linux.cc @@ -36,6 +36,13 @@ static int ncpus = 0; static void init_num_cpus() { +#ifndef GPR_MUSL_LIBC_COMPAT + if (sched_getcpu() < 0) { + gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno)); + ncpus = 1; + return; + } +#endif /* This must be signed. sysconf returns -1 when the number cannot be determined */ ncpus = (int)sysconf(_SC_NPROCESSORS_ONLN); @@ -56,6 +63,9 @@ unsigned gpr_cpu_current_cpu(void) { // sched_getcpu() is undefined on musl return 0; #else + if (gpr_cpu_num_cores() == 1) { + return 0; + } int cpu = sched_getcpu(); if (cpu < 0) { gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno)); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 39563a8bb3..57bb6cc18b 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -349,14 +349,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx* exec_ctx, grpc_error* error) { requested_call* rc; for (size_t i = 0; i < server->cq_count; i++) { - /* Here we know: - 1. no requests are being added (since the server is shut down) - 2. no other threads are pulling (since the shut down process is single - threaded) - So, we can ignore the queue lock and just pop, with the guarantee that a - NULL returned here truly means that the queue is empty */ - while ((rc = (requested_call*)gpr_mpscq_pop( - &rm->requests_per_cq[i].queue)) != nullptr) { + while ((rc = (requested_call*)gpr_locked_mpscq_pop( + &rm->requests_per_cq[i])) != nullptr) { fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); } } |