From b6b8de1758be800f170a15d7cf05fffc04619bb8 Mon Sep 17 00:00:00 2001 From: Ian Coolidge Date: Thu, 19 Oct 2017 14:50:26 -0700 Subject: reland: cpu_linux: Don't spam sched_getcpu failures on qemu __NR_getcpu isn't implemented on qemu, and for some reason sysconf(_SC_NPROCESSORS_ONLN) returns the number of processers of the host system, giving a false indication that there is more than one cpu for the qemu case. Expand the init_num_cpus sequence to also run sched_getcpu once, if GPR_MUSL_LIBC_COMPAT isn't defined. If that call isn't supported, initialize 'ncpus' to 1. Later, in gpr_cpu_current_cpu, use gpr_cpu_num_cores to avoid the system call in cases where we know it isn't supported, or if the ncpus is otherwise 1. --- src/core/lib/support/cpu_linux.cc | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'src/core/lib') diff --git a/src/core/lib/support/cpu_linux.cc b/src/core/lib/support/cpu_linux.cc index 2280668442..21b1a71dc9 100644 --- a/src/core/lib/support/cpu_linux.cc +++ b/src/core/lib/support/cpu_linux.cc @@ -36,6 +36,13 @@ static int ncpus = 0; static void init_num_cpus() { +#ifndef GPR_MUSL_LIBC_COMPAT + if (sched_getcpu() < 0) { + gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno)); + ncpus = 1; + return; + } +#endif /* This must be signed. sysconf returns -1 when the number cannot be determined */ ncpus = (int)sysconf(_SC_NPROCESSORS_ONLN); @@ -56,6 +63,9 @@ unsigned gpr_cpu_current_cpu(void) { // sched_getcpu() is undefined on musl return 0; #else + if (gpr_cpu_num_cores() == 1) { + return 0; + } int cpu = sched_getcpu(); if (cpu < 0) { gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno)); -- cgit v1.2.3 From dd34f3caf6648290c415864e8df1b8b2d5e8882a Mon Sep 17 00:00:00 2001 From: ncteisen Date: Thu, 9 Nov 2017 11:41:59 -0800 Subject: Inline closure --- CMakeLists.txt | 6 - Makefile | 6 - build.yaml | 1 - config.m4 | 1 - config.w32 | 1 - gRPC-Core.podspec | 1 - grpc.gemspec | 1 - grpc.gyp | 4 - package.xml | 1 - src/core/lib/iomgr/closure.cc | 219 -------------------- src/core/lib/iomgr/closure.h | 230 ++++++++++++++++++--- src/python/grpcio/grpc_core_dependencies.py | 1 - tools/doxygen/Doxyfile.core.internal | 1 - tools/run_tests/generated/sources_and_headers.json | 1 - 14 files changed, 198 insertions(+), 276 deletions(-) delete mode 100644 src/core/lib/iomgr/closure.cc (limited to 'src/core/lib') diff --git a/CMakeLists.txt b/CMakeLists.txt index 4daff95efd..c180fdc7fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -974,7 +974,6 @@ add_library(grpc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc - src/core/lib/iomgr/closure.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc src/core/lib/iomgr/endpoint_pair_posix.cc @@ -1315,7 +1314,6 @@ add_library(grpc_cronet src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc - src/core/lib/iomgr/closure.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc src/core/lib/iomgr/endpoint_pair_posix.cc @@ -1637,7 +1635,6 @@ add_library(grpc_test_util src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc - src/core/lib/iomgr/closure.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc src/core/lib/iomgr/endpoint_pair_posix.cc @@ -1903,7 +1900,6 @@ add_library(grpc_test_util_unsecure src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc - src/core/lib/iomgr/closure.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc src/core/lib/iomgr/endpoint_pair_posix.cc @@ -2155,7 +2151,6 @@ add_library(grpc_unsecure src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc - src/core/lib/iomgr/closure.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc src/core/lib/iomgr/endpoint_pair_posix.cc @@ -2906,7 +2901,6 @@ add_library(grpc++_cronet src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc - src/core/lib/iomgr/closure.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc src/core/lib/iomgr/endpoint_pair_posix.cc diff --git a/Makefile b/Makefile index 1e3418e410..124e74cf4a 100644 --- a/Makefile +++ b/Makefile @@ -2959,7 +2959,6 @@ LIBGRPC_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ - src/core/lib/iomgr/closure.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ @@ -3299,7 +3298,6 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ - src/core/lib/iomgr/closure.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ @@ -3619,7 +3617,6 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ - src/core/lib/iomgr/closure.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ @@ -3875,7 +3872,6 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ - src/core/lib/iomgr/closure.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ @@ -4104,7 +4100,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ - src/core/lib/iomgr/closure.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ @@ -4833,7 +4828,6 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ - src/core/lib/iomgr/closure.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ diff --git a/build.yaml b/build.yaml index cd6486ea29..c62b5d44fe 100644 --- a/build.yaml +++ b/build.yaml @@ -168,7 +168,6 @@ filegroups: - src/core/lib/http/httpcli.cc - src/core/lib/http/parser.cc - src/core/lib/iomgr/call_combiner.cc - - src/core/lib/iomgr/closure.cc - src/core/lib/iomgr/combiner.cc - src/core/lib/iomgr/endpoint.cc - src/core/lib/iomgr/endpoint_pair_posix.cc diff --git a/config.m4 b/config.m4 index 5cb46cf054..d2f2520fea 100644 --- a/config.m4 +++ b/config.m4 @@ -104,7 +104,6 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ - src/core/lib/iomgr/closure.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ diff --git a/config.w32 b/config.w32 index 0fc5066b29..8a713751dc 100644 --- a/config.w32 +++ b/config.w32 @@ -81,7 +81,6 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\http\\httpcli.cc " + "src\\core\\lib\\http\\parser.cc " + "src\\core\\lib\\iomgr\\call_combiner.cc " + - "src\\core\\lib\\iomgr\\closure.cc " + "src\\core\\lib\\iomgr\\combiner.cc " + "src\\core\\lib\\iomgr\\endpoint.cc " + "src\\core\\lib\\iomgr\\endpoint_pair_posix.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 01b8d65977..02c6a64515 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -478,7 +478,6 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', - 'src/core/lib/iomgr/closure.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', diff --git a/grpc.gemspec b/grpc.gemspec index 2fe2536cc7..d674c09005 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -413,7 +413,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/http/httpcli.cc ) s.files += %w( src/core/lib/http/parser.cc ) s.files += %w( src/core/lib/iomgr/call_combiner.cc ) - s.files += %w( src/core/lib/iomgr/closure.cc ) s.files += %w( src/core/lib/iomgr/combiner.cc ) s.files += %w( src/core/lib/iomgr/endpoint.cc ) s.files += %w( src/core/lib/iomgr/endpoint_pair_posix.cc ) diff --git a/grpc.gyp b/grpc.gyp index 7075aa9afd..de7af7bacd 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -245,7 +245,6 @@ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', - 'src/core/lib/iomgr/closure.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', @@ -535,7 +534,6 @@ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', - 'src/core/lib/iomgr/closure.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', @@ -743,7 +741,6 @@ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', - 'src/core/lib/iomgr/closure.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', @@ -936,7 +933,6 @@ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', - 'src/core/lib/iomgr/closure.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', diff --git a/package.xml b/package.xml index 9dee62f871..3356c271d8 100644 --- a/package.xml +++ b/package.xml @@ -425,7 +425,6 @@ - diff --git a/src/core/lib/iomgr/closure.cc b/src/core/lib/iomgr/closure.cc deleted file mode 100644 index 60e99d0e4e..0000000000 --- a/src/core/lib/iomgr/closure.cc +++ /dev/null @@ -1,219 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/closure.h" - -#include -#include -#include - -#include "src/core/lib/profiling/timers.h" - -#ifndef NDEBUG -grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false, "closure"); -#endif - -#ifndef NDEBUG -grpc_closure* grpc_closure_init(const char* file, int line, - grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler) { -#else -grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler) { -#endif - closure->cb = cb; - closure->cb_arg = cb_arg; - closure->scheduler = scheduler; -#ifndef NDEBUG - closure->scheduled = false; - closure->file_initiated = NULL; - closure->line_initiated = 0; - closure->run = false; - closure->file_created = file; - closure->line_created = line; -#endif - return closure; -} - -void grpc_closure_list_init(grpc_closure_list* closure_list) { - closure_list->head = closure_list->tail = NULL; -} - -bool grpc_closure_list_append(grpc_closure_list* closure_list, - grpc_closure* closure, grpc_error* error) { - if (closure == NULL) { - GRPC_ERROR_UNREF(error); - return false; - } - closure->error_data.error = error; - closure->next_data.next = NULL; - bool was_empty = (closure_list->head == NULL); - if (was_empty) { - closure_list->head = closure; - } else { - closure_list->tail->next_data.next = closure; - } - closure_list->tail = closure; - return was_empty; -} - -void grpc_closure_list_fail_all(grpc_closure_list* list, - grpc_error* forced_failure) { - for (grpc_closure* c = list->head; c != NULL; c = c->next_data.next) { - if (c->error_data.error == GRPC_ERROR_NONE) { - c->error_data.error = GRPC_ERROR_REF(forced_failure); - } - } - GRPC_ERROR_UNREF(forced_failure); -} - -bool grpc_closure_list_empty(grpc_closure_list closure_list) { - return closure_list.head == NULL; -} - -void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst) { - if (src->head == NULL) { - return; - } - if (dst->head == NULL) { - *dst = *src; - } else { - dst->tail->next_data.next = src->head; - dst->tail = src->tail; - } - src->head = src->tail = NULL; -} - -typedef struct { - grpc_iomgr_cb_func cb; - void* cb_arg; - grpc_closure wrapper; -} wrapped_closure; - -static void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - wrapped_closure* wc = (wrapped_closure*)arg; - grpc_iomgr_cb_func cb = wc->cb; - void* cb_arg = wc->cb_arg; - gpr_free(wc); - cb(exec_ctx, cb_arg, error); -} - -#ifndef NDEBUG -grpc_closure* grpc_closure_create(const char* file, int line, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { -#else -grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { -#endif - wrapped_closure* wc = (wrapped_closure*)gpr_malloc(sizeof(*wc)); - wc->cb = cb; - wc->cb_arg = cb_arg; -#ifndef NDEBUG - grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler); -#else - grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler); -#endif - return &wc->wrapper; -} - -#ifndef NDEBUG -void grpc_closure_run(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* c, grpc_error* error) { -#else -void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c, - grpc_error* error) { -#endif - GPR_TIMER_BEGIN("grpc_closure_run", 0); - if (c != NULL) { -#ifndef NDEBUG - c->file_initiated = file; - c->line_initiated = line; - c->run = true; -#endif - assert(c->cb); - c->scheduler->vtable->run(exec_ctx, c, error); - } else { - GRPC_ERROR_UNREF(error); - } - GPR_TIMER_END("grpc_closure_run", 0); -} - -#ifndef NDEBUG -void grpc_closure_sched(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* c, grpc_error* error) { -#else -void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c, - grpc_error* error) { -#endif - GPR_TIMER_BEGIN("grpc_closure_sched", 0); - if (c != NULL) { -#ifndef NDEBUG - if (c->scheduled) { - gpr_log(GPR_ERROR, - "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", - c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); - abort(); - } - c->scheduled = true; - c->file_initiated = file; - c->line_initiated = line; - c->run = false; -#endif - assert(c->cb); - c->scheduler->vtable->sched(exec_ctx, c, error); - } else { - GRPC_ERROR_UNREF(error); - } - GPR_TIMER_END("grpc_closure_sched", 0); -} - -#ifndef NDEBUG -void grpc_closure_list_sched(const char* file, int line, - grpc_exec_ctx* exec_ctx, grpc_closure_list* list) { -#else -void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, grpc_closure_list* list) { -#endif - grpc_closure* c = list->head; - while (c != NULL) { - grpc_closure* next = c->next_data.next; -#ifndef NDEBUG - if (c->scheduled) { - gpr_log(GPR_ERROR, - "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", - c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); - abort(); - } - c->scheduled = true; - c->file_initiated = file; - c->line_initiated = line; - c->run = false; -#endif - assert(c->cb); - c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); - c = next; - } - list->head = list->tail = NULL; -} diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 8b1188e2db..279aed8467 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -21,9 +21,13 @@ #include +#include #include +#include +#include #include #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/mpscq.h" #ifdef __cplusplus @@ -107,32 +111,82 @@ struct grpc_closure { #endif }; +#ifndef NDEBUG +inline grpc_closure* grpc_closure_init(const char* file, int line, + grpc_closure* closure, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#else +inline grpc_closure* grpc_closure_init(grpc_closure* closure, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#endif + closure->cb = cb; + closure->cb_arg = cb_arg; + closure->scheduler = scheduler; +#ifndef NDEBUG + closure->scheduled = false; + closure->file_initiated = NULL; + closure->line_initiated = 0; + closure->run = false; + closure->file_created = file; + closure->line_created = line; +#endif + return closure; +} + /** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */ #ifndef NDEBUG -grpc_closure* grpc_closure_init(const char* file, int line, - grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler) #else -grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_iomgr_cb_func cb, - void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ grpc_closure_init(closure, cb, cb_arg, scheduler) #endif +namespace { + +typedef struct { + grpc_iomgr_cb_func cb; + void* cb_arg; + grpc_closure wrapper; +} wrapped_closure; + +static void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + wrapped_closure* wc = (wrapped_closure*)arg; + grpc_iomgr_cb_func cb = wc->cb; + void* cb_arg = wc->cb_arg; + gpr_free(wc); + cb(exec_ctx, cb_arg, error); +} + +} // anonymous namespace + +#ifndef NDEBUG +inline grpc_closure* grpc_closure_create(const char* file, int line, + grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#else +inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, + grpc_closure_scheduler* scheduler) { +#endif + wrapped_closure* wc = (wrapped_closure*)gpr_malloc(sizeof(*wc)); + wc->cb = cb; + wc->cb_arg = cb_arg; +#ifndef NDEBUG + grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler); +#else + grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler); +#endif + return &wc->wrapper; +} + /* Create a heap allocated closure: try to avoid except for very rare events */ #ifndef NDEBUG -grpc_closure* grpc_closure_create(const char* file, int line, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler) #else -grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler); #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ grpc_closure_create(cb, cb_arg, scheduler) #endif @@ -140,63 +194,175 @@ grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } -void grpc_closure_list_init(grpc_closure_list* list); +inline void grpc_closure_list_init(grpc_closure_list* closure_list) { + closure_list->head = closure_list->tail = NULL; +} /** add \a closure to the end of \a list and set \a closure's result to \a error Returns true if \a list becomes non-empty */ -bool grpc_closure_list_append(grpc_closure_list* list, grpc_closure* closure, - grpc_error* error); +inline bool grpc_closure_list_append(grpc_closure_list* closure_list, + grpc_closure* closure, grpc_error* error) { + if (closure == NULL) { + GRPC_ERROR_UNREF(error); + return false; + } + closure->error_data.error = error; + closure->next_data.next = NULL; + bool was_empty = (closure_list->head == NULL); + if (was_empty) { + closure_list->head = closure; + } else { + closure_list->tail->next_data.next = closure; + } + closure_list->tail = closure; + return was_empty; +} /** force all success bits in \a list to false */ -void grpc_closure_list_fail_all(grpc_closure_list* list, - grpc_error* forced_failure); +inline void grpc_closure_list_fail_all(grpc_closure_list* list, + grpc_error* forced_failure) { + for (grpc_closure* c = list->head; c != NULL; c = c->next_data.next) { + if (c->error_data.error == GRPC_ERROR_NONE) { + c->error_data.error = GRPC_ERROR_REF(forced_failure); + } + } + GRPC_ERROR_UNREF(forced_failure); +} /** append all closures from \a src to \a dst and empty \a src. */ -void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst); +inline void grpc_closure_list_move(grpc_closure_list* src, + grpc_closure_list* dst) { + if (src->head == NULL) { + return; + } + if (dst->head == NULL) { + *dst = *src; + } else { + dst->tail->next_data.next = src->head; + dst->tail = src->tail; + } + src->head = src->tail = NULL; +} /** return whether \a list is empty. */ -bool grpc_closure_list_empty(grpc_closure_list list); +inline bool grpc_closure_list_empty(grpc_closure_list closure_list) { + return closure_list.head == NULL; +} + +#ifndef NDEBUG +inline void grpc_closure_run(const char* file, int line, + grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#else +inline void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#endif + GPR_TIMER_BEGIN("grpc_closure_run", 0); + if (c != NULL) { +#ifndef NDEBUG + c->file_initiated = file; + c->line_initiated = line; + c->run = true; +#endif + assert(c->cb); + c->scheduler->vtable->run(exec_ctx, c, error); + } else { + GRPC_ERROR_UNREF(error); + } + GPR_TIMER_END("grpc_closure_run", 0); +} /** Run a closure directly. Caller ensures that no locks are being held above. * Note that calling this at the end of a closure callback function itself is * by definition safe. */ #ifndef NDEBUG -void grpc_closure_run(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* closure, grpc_error* error); #define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \ grpc_closure_run(__FILE__, __LINE__, exec_ctx, closure, error) #else -void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure, - grpc_error* error); #define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \ grpc_closure_run(exec_ctx, closure, error) #endif +#ifndef NDEBUG +inline void grpc_closure_sched(const char* file, int line, + grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#else +inline void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c, + grpc_error* error) { +#endif + GPR_TIMER_BEGIN("grpc_closure_sched", 0); + if (c != NULL) { +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; +#endif + assert(c->cb); + c->scheduler->vtable->sched(exec_ctx, c, error); + } else { + GRPC_ERROR_UNREF(error); + } + GPR_TIMER_END("grpc_closure_sched", 0); +} + /** Schedule a closure to be run. Does not need to be run from a safe point. */ #ifndef NDEBUG -void grpc_closure_sched(const char* file, int line, grpc_exec_ctx* exec_ctx, - grpc_closure* closure, grpc_error* error); #define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \ grpc_closure_sched(__FILE__, __LINE__, exec_ctx, closure, error) #else -void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure, - grpc_error* error); #define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \ grpc_closure_sched(exec_ctx, closure, error) #endif +#ifndef NDEBUG +inline void grpc_closure_list_sched(const char* file, int line, + grpc_exec_ctx* exec_ctx, + grpc_closure_list* list) { +#else +inline void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, + grpc_closure_list* list) { +#endif + grpc_closure* c = list->head; + while (c != NULL) { + grpc_closure* next = c->next_data.next; +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; +#endif + assert(c->cb); + c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); + c = next; + } + list->head = list->tail = NULL; +} + /** Schedule all closures in a list to be run. Does not need to be run from a * safe point. */ #ifndef NDEBUG -void grpc_closure_list_sched(const char* file, int line, - grpc_exec_ctx* exec_ctx, - grpc_closure_list* closure_list); #define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \ grpc_closure_list_sched(__FILE__, __LINE__, exec_ctx, closure_list) #else -void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, - grpc_closure_list* closure_list); #define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \ grpc_closure_list_sched(exec_ctx, closure_list) #endif diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index bb7d990078..330c4185c6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -80,7 +80,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', - 'src/core/lib/iomgr/closure.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index b9844f8b89..b8434bb819 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1073,7 +1073,6 @@ src/core/lib/iomgr/README.md \ src/core/lib/iomgr/block_annotate.h \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/call_combiner.h \ -src/core/lib/iomgr/closure.cc \ src/core/lib/iomgr/closure.h \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/combiner.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 6cd9a04056..5b4af26e8e 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7930,7 +7930,6 @@ "src/core/lib/http/httpcli.cc", "src/core/lib/http/parser.cc", "src/core/lib/iomgr/call_combiner.cc", - "src/core/lib/iomgr/closure.cc", "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/endpoint.cc", "src/core/lib/iomgr/endpoint_pair_posix.cc", -- cgit v1.2.3 From 429a134f3d7ba2eb81c39e13f5aeea691e3e1104 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 10 Nov 2017 16:21:03 -0800 Subject: Fix trace_closure linker error --- src/core/lib/iomgr/error.cc | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 123ff72851..ecb3352d0a 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -40,6 +40,8 @@ #ifndef NDEBUG grpc_tracer_flag grpc_trace_error_refcount = GRPC_TRACER_INITIALIZER(false, "error_refcount"); +grpc_tracer_flag grpc_trace_closure = + GRPC_TRACER_INITIALIZER(false, "closure"); #endif static const char* error_int_name(grpc_error_ints key) { -- cgit v1.2.3 From b945cc443132c9f9797f1215e2d60bb051c07366 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Sun, 12 Nov 2017 20:18:18 -0800 Subject: clang fmt --- src/core/lib/iomgr/error.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index ecb3352d0a..a55438e949 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -40,8 +40,7 @@ #ifndef NDEBUG grpc_tracer_flag grpc_trace_error_refcount = GRPC_TRACER_INITIALIZER(false, "error_refcount"); -grpc_tracer_flag grpc_trace_closure = - GRPC_TRACER_INITIALIZER(false, "closure"); +grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false, "closure"); #endif static const char* error_int_name(grpc_error_ints key) { -- cgit v1.2.3 From 4023fb93577b1aba2f097449d66a6d54896d778b Mon Sep 17 00:00:00 2001 From: Noah Eisen Date: Mon, 13 Nov 2017 17:24:37 -0800 Subject: clang tidy --- src/core/lib/iomgr/closure.h | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 279aed8467..5304158f17 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -89,7 +89,7 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void* cb_arg; - /** Scheduler to schedule against: NULL to schedule against current execution + /** Scheduler to schedule against: nullptr to schedule against current execution context */ grpc_closure_scheduler* scheduler; @@ -126,7 +126,7 @@ inline grpc_closure* grpc_closure_init(grpc_closure* closure, closure->scheduler = scheduler; #ifndef NDEBUG closure->scheduled = false; - closure->file_initiated = NULL; + closure->file_initiated = nullptr; closure->line_initiated = 0; closure->run = false; closure->file_created = file; @@ -192,10 +192,10 @@ inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, #endif #define GRPC_CLOSURE_LIST_INIT \ - { NULL, NULL } + { nullptr, nullptr } inline void grpc_closure_list_init(grpc_closure_list* closure_list) { - closure_list->head = closure_list->tail = NULL; + closure_list->head = closure_list->tail = nullptr; } /** add \a closure to the end of \a list @@ -203,13 +203,13 @@ inline void grpc_closure_list_init(grpc_closure_list* closure_list) { Returns true if \a list becomes non-empty */ inline bool grpc_closure_list_append(grpc_closure_list* closure_list, grpc_closure* closure, grpc_error* error) { - if (closure == NULL) { + if (closure == nullptr) { GRPC_ERROR_UNREF(error); return false; } closure->error_data.error = error; - closure->next_data.next = NULL; - bool was_empty = (closure_list->head == NULL); + closure->next_data.next = nullptr; + bool was_empty = (closure_list->head == nullptr); if (was_empty) { closure_list->head = closure; } else { @@ -222,7 +222,7 @@ inline bool grpc_closure_list_append(grpc_closure_list* closure_list, /** force all success bits in \a list to false */ inline void grpc_closure_list_fail_all(grpc_closure_list* list, grpc_error* forced_failure) { - for (grpc_closure* c = list->head; c != NULL; c = c->next_data.next) { + for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) { if (c->error_data.error == GRPC_ERROR_NONE) { c->error_data.error = GRPC_ERROR_REF(forced_failure); } @@ -233,21 +233,21 @@ inline void grpc_closure_list_fail_all(grpc_closure_list* list, /** append all closures from \a src to \a dst and empty \a src. */ inline void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst) { - if (src->head == NULL) { + if (src->head == nullptr) { return; } - if (dst->head == NULL) { + if (dst->head == nullptr) { *dst = *src; } else { dst->tail->next_data.next = src->head; dst->tail = src->tail; } - src->head = src->tail = NULL; + src->head = src->tail = nullptr; } /** return whether \a list is empty. */ inline bool grpc_closure_list_empty(grpc_closure_list closure_list) { - return closure_list.head == NULL; + return closure_list.head == nullptr; } #ifndef NDEBUG @@ -259,7 +259,7 @@ inline void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c, grpc_error* error) { #endif GPR_TIMER_BEGIN("grpc_closure_run", 0); - if (c != NULL) { + if (c != nullptr) { #ifndef NDEBUG c->file_initiated = file; c->line_initiated = line; @@ -293,7 +293,7 @@ inline void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c, grpc_error* error) { #endif GPR_TIMER_BEGIN("grpc_closure_sched", 0); - if (c != NULL) { + if (c != nullptr) { #ifndef NDEBUG if (c->scheduled) { gpr_log(GPR_ERROR, @@ -334,7 +334,7 @@ inline void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, grpc_closure_list* list) { #endif grpc_closure* c = list->head; - while (c != NULL) { + while (c != nullptr) { grpc_closure* next = c->next_data.next; #ifndef NDEBUG if (c->scheduled) { @@ -354,7 +354,7 @@ inline void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); c = next; } - list->head = list->tail = NULL; + list->head = list->tail = nullptr; } /** Schedule all closures in a list to be run. Does not need to be run from a -- cgit v1.2.3 From 054426cc9ba68925ac61f41846d4dfd10091da92 Mon Sep 17 00:00:00 2001 From: Noah Eisen Date: Mon, 13 Nov 2017 17:25:05 -0800 Subject: clang fmt --- src/core/lib/iomgr/closure.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 5304158f17..50096f162f 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -89,8 +89,8 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void* cb_arg; - /** Scheduler to schedule against: nullptr to schedule against current execution - context */ + /** Scheduler to schedule against: nullptr to schedule against current + execution context */ grpc_closure_scheduler* scheduler; /** Once queued, the result of the closure. Before then: scratch space */ -- cgit v1.2.3 From ce0ad22f8735581afdb8ce8d78c4293a8b360d2c Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 15 Nov 2017 16:15:12 -0800 Subject: Reviewer feedback --- src/core/lib/iomgr/closure.h | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 50096f162f..8ecb5b3583 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -144,7 +144,7 @@ inline grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_closure_init(closure, cb, cb_arg, scheduler) #endif -namespace { +namespace closure_impl { typedef struct { grpc_iomgr_cb_func cb; @@ -152,8 +152,7 @@ typedef struct { grpc_closure wrapper; } wrapped_closure; -static void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { wrapped_closure* wc = (wrapped_closure*)arg; grpc_iomgr_cb_func cb = wc->cb; void* cb_arg = wc->cb_arg; @@ -161,7 +160,7 @@ static void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, cb(exec_ctx, cb_arg, error); } -} // anonymous namespace +} // namespace closure_impl #ifndef NDEBUG inline grpc_closure* grpc_closure_create(const char* file, int line, @@ -171,13 +170,15 @@ inline grpc_closure* grpc_closure_create(const char* file, int line, inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, grpc_closure_scheduler* scheduler) { #endif - wrapped_closure* wc = (wrapped_closure*)gpr_malloc(sizeof(*wc)); + closure_impl::wrapped_closure* wc = + (closure_impl::wrapped_closure*)gpr_malloc(sizeof(*wc)); wc->cb = cb; wc->cb_arg = cb_arg; #ifndef NDEBUG - grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler); + grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper, wc, + scheduler); #else - grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler); + grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc, scheduler); #endif return &wc->wrapper; } -- cgit v1.2.3 From 232af482b5b8d23de247d051bf206b3a46292552 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Thu, 16 Nov 2017 11:09:15 -0800 Subject: inline helper and remove extern c --- src/core/lib/iomgr/closure.h | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 8ecb5b3583..8d6bdc98d8 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -30,10 +30,6 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/mpscq.h" -#ifdef __cplusplus -extern "C" { -#endif - struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -152,7 +148,7 @@ typedef struct { grpc_closure wrapper; } wrapped_closure; -void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +inline void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { wrapped_closure* wc = (wrapped_closure*)arg; grpc_iomgr_cb_func cb = wc->cb; void* cb_arg = wc->cb_arg; @@ -368,8 +364,4 @@ inline void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, grpc_closure_list_sched(exec_ctx, closure_list) #endif -#ifdef __cplusplus -} -#endif - #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ -- cgit v1.2.3 From 3035950cb47cd347f8532d4d9986cef3c2b0be2a Mon Sep 17 00:00:00 2001 From: ncteisen Date: Thu, 16 Nov 2017 11:48:10 -0800 Subject: clang fmt --- src/core/lib/iomgr/closure.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 8d6bdc98d8..8b99d4a599 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -148,7 +148,8 @@ typedef struct { grpc_closure wrapper; } wrapped_closure; -inline void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +inline void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { wrapped_closure* wc = (wrapped_closure*)arg; grpc_iomgr_cb_func cb = wc->cb; void* cb_arg = wc->cb_arg; -- cgit v1.2.3 From 24908a61e413cea9cf0edf953802fc9643420777 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Thu, 16 Nov 2017 12:35:43 -0800 Subject: Use lock when popping requests on server shutdown Doing this without a lock causes TSAN failures for quic. There isn't much need to be clever here because this only impacts shutdown performance, which doesn't really matter. --- src/core/lib/surface/server.cc | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index b816439770..2e8609ee9e 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -350,14 +350,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx* exec_ctx, grpc_error* error) { requested_call* rc; for (size_t i = 0; i < server->cq_count; i++) { - /* Here we know: - 1. no requests are being added (since the server is shut down) - 2. no other threads are pulling (since the shut down process is single - threaded) - So, we can ignore the queue lock and just pop, with the guarantee that a - NULL returned here truly means that the queue is empty */ - while ((rc = (requested_call*)gpr_mpscq_pop( - &rm->requests_per_cq[i].queue)) != nullptr) { + while ((rc = (requested_call*)gpr_locked_mpscq_pop( + &rm->requests_per_cq[i])) != nullptr) { fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); } } -- cgit v1.2.3 From 3dc4bc2f06d9de156c9e61da00b9ce1637a1bee2 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 16 Nov 2017 18:25:43 -0800 Subject: Fix uses of resource quota in UV TCP code --- src/core/lib/iomgr/resource_quota.cc | 7 -- src/core/lib/iomgr/tcp_uv.cc | 126 +++++++++++++++++++++-------------- 2 files changed, 76 insertions(+), 57 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 9a44fa203c..c2179cf29a 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -896,10 +896,3 @@ void grpc_resource_user_alloc_slices( grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, count * length, &slice_allocator->on_allocated); } - -grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, - size_t size) { - grpc_resource_user_alloc(exec_ctx, resource_user, size, nullptr); - return ru_slice_create(resource_user, size); -} diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index ac9ca4ea11..2d3295fe2a 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -52,12 +52,12 @@ typedef struct { grpc_closure* read_cb; grpc_closure* write_cb; - grpc_slice read_slice; grpc_slice_buffer* read_slices; grpc_slice_buffer* write_slices; uv_buf_t* write_buffers; grpc_resource_user* resource_user; + grpc_resource_user_slice_allocator slice_allocator; bool shutting_down; @@ -66,7 +66,6 @@ typedef struct { } grpc_tcp; static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { - grpc_slice_unref_internal(exec_ctx, tcp->read_slice); grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp->handle); gpr_free(tcp->peer_string); @@ -119,91 +118,120 @@ static void uv_close_callback(uv_handle_t* handle) { grpc_exec_ctx_finish(&exec_ctx); } -static grpc_slice alloc_read_slice(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user) { - return grpc_resource_user_slice_malloc(exec_ctx, resource_user, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE); -} - static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp* tcp = (grpc_tcp*)handle->data; (void)suggested_size; - buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slice); - buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); + /* Before calling uv_read_start, we allocate a buffer with exactly one slice + * to tcp->read_slices and wait for the callback indicating that the + * allocation was successful. So slices[0] should always exist here */ + buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[0]); + buf->len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]); grpc_exec_ctx_finish(&exec_ctx); } +static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, + grpc_error* error) { + grpc_closure* cb = tcp->read_cb; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); + size_t i; + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < tcp->read_slices->count; i++) { + char* dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, + dump); + gpr_free(dump); + } + } + tcp->read_slices = NULL; + tcp->read_cb = NULL; + GRPC_CLOSURE_RUN(exec_ctx, cb, error); +} + static void read_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - grpc_slice sub; grpc_error* error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp* tcp = (grpc_tcp*)stream->data; - grpc_closure* cb = tcp->read_cb; + grpc_slice_buffer garbage; if (nread == 0) { // Nothing happened. Wait for the next callback return; } TCP_UNREF(&exec_ctx, tcp, "read"); - tcp->read_cb = NULL; // TODO(murgatroid99): figure out what the return value here means uv_read_stop(stream); if (nread == UV_EOF) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, tcp->read_slices); } else if (nread > 0) { // Successful read - sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); - grpc_slice_buffer_add(tcp->read_slices, sub); - tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); error = GRPC_ERROR_NONE; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { - size_t i; - const char* str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "read: error=%s", str); - - for (i = 0; i < tcp->read_slices->count; i++) { - char* dump = grpc_dump_slice(tcp->read_slices->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, - dump); - gpr_free(dump); - } + if ((size_t)nread < tcp->read_slices->length) { + /* TODO(murgatroid99): Instead of discarding the unused part of the read + * buffer, reuse it as the next read buffer. */ + grpc_slice_buffer_init(&garbage); + grpc_slice_buffer_trim_end( + tcp->read_slices, + tcp->read_slices->length - (size_t)nread, + &garbage); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &garbage); } } else { // nread < 0: Error error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, tcp->read_slices); } - GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); + call_read_cb(&exec_ctx, tcp, error); grpc_exec_ctx_finish(&exec_ctx); } +static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp, + grpc_error* error) { + int status; + grpc_tcp* tcp = (grpc_tcp*)tcpp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, + grpc_error_string(error)); + } + if (error == GRPC_ERROR_NONE) { + status = + uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback); + if (status != 0) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); + } + } + if (error != GRPC_ERROR_NONE) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->read_slices); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(exec_ctx, tcp, "read"); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str); + } +} + static void uv_endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_slice_buffer* read_slices, grpc_closure* cb) { grpc_tcp* tcp = (grpc_tcp*)ep; - int status; - grpc_error* error = GRPC_ERROR_NONE; GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices); TCP_REF(tcp, "read"); - // TODO(murgatroid99): figure out what the return value here means - status = - uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback); - if (status != 0) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - grpc_slice_from_static_string(uv_strerror(status))); - GRPC_CLOSURE_SCHED(exec_ctx, cb, error); - } - if (GRPC_TRACER_ON(grpc_tcp_trace)) { - const char* str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str); - } + grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + tcp->read_slices); } static void write_callback(uv_write_t* req, int status) { @@ -223,8 +251,6 @@ static void write_callback(uv_write_t* req, int status) { gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); } gpr_free(tcp->write_buffers); - grpc_resource_user_free(&exec_ctx, tcp->resource_user, - sizeof(uv_buf_t) * tcp->write_slices->count); GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); } @@ -271,8 +297,6 @@ static void uv_endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * buffer_count); - grpc_resource_user_alloc(exec_ctx, tcp->resource_user, - sizeof(uv_buf_t) * buffer_count, NULL); for (i = 0; i < buffer_count; i++) { slice = &tcp->write_slices->slices[i]; buffers[i].base = (char*)GRPC_SLICE_START_PTR(*slice); @@ -381,8 +405,10 @@ grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle, gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; + tcp->read_slices = NULL; tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); - tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); + grpc_resource_user_slice_allocator_init( + &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); -- cgit v1.2.3 From 5dc3cc1b01e908b5635e43a39bd914516c2a0071 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 17 Nov 2017 09:40:57 -0800 Subject: Clang format --- src/core/lib/iomgr/tcp_uv.cc | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index 2d3295fe2a..de7ec430c8 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -143,8 +143,7 @@ static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, for (i = 0; i < tcp->read_slices->count; i++) { char* dump = grpc_dump_slice(tcp->read_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, - dump); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); gpr_free(dump); } } @@ -177,9 +176,7 @@ static void read_callback(uv_stream_t* stream, ssize_t nread, * buffer, reuse it as the next read buffer. */ grpc_slice_buffer_init(&garbage); grpc_slice_buffer_trim_end( - tcp->read_slices, - tcp->read_slices->length - (size_t)nread, - &garbage); + tcp->read_slices, tcp->read_slices->length - (size_t)nread, &garbage); grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &garbage); } } else { @@ -204,9 +201,9 @@ static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp, uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback); if (status != 0) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - grpc_slice_from_static_string(uv_strerror(status))); + error = grpc_error_set_str( + error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); } } if (error != GRPC_ERROR_NONE) { -- cgit v1.2.3 From 9f4759ac144db54c638a7dc9d1c1ee7ba6208c7a Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 16 Nov 2017 21:35:54 -0800 Subject: Disable caching start-time for all clocktypes except GPR_CLOCK_MONOTONIC Caching the start-time for GPR_CLOCK_REALTIME has been causing errors in cases where the system time is changed (after caching the time). In such cases, the following functions produce incorrect results (and are off by how much ever the system time was changed) grpc_millis_to_timespec() and grpc_timespec_to_millis_round_down() This can cause problems especially when using the above functions to get timer deadlines or completion queue timeouts. (In the worst case scenarios, the timeouts/deadlines will always occur (if the timeout inverval / deadline was less than the system change delta) Ideally we should be reverting https://github.com/grpc/grpc/pull/11866 but since that is a large change (which introduced new APIs in exec_ctx.cc), I am doing this change to effectively revert to the old behavior (while still keeping the new APIs introduced in exec_ctx) --- src/core/lib/iomgr/exec_ctx.cc | 81 +++++-------------------------------- src/core/lib/iomgr/exec_ctx.h | 2 - src/core/lib/iomgr/timer_manager.cc | 4 -- 3 files changed, 10 insertions(+), 77 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 27c769a7ed..257ada0f65 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -25,9 +25,6 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/profiling/timers.h" -#define GRPC_START_TIME_UPDATE_INTERVAL 10000 -extern "C" grpc_tracer_flag grpc_timer_check_trace; - bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx* exec_ctx) { if ((exec_ctx->flags & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { if (exec_ctx->check_ready_to_finish(exec_ctx, @@ -107,49 +104,16 @@ static void exec_ctx_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -/* This time pair is not entirely thread-safe as store/load of tv_sec and - * tv_nsec are performed separately. However g_start_time do not need to have - * sub-second precision, so it is ok if the value of tv_nsec is off in this - * case. */ -typedef struct time_atm_pair { - gpr_atm tv_sec; - gpr_atm tv_nsec; -} time_atm_pair; - -static time_atm_pair - g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the - // last enum value in - // gpr_clock_type -static grpc_millis g_last_start_time_update; - -static gpr_timespec timespec_from_time_atm_pair(const time_atm_pair* src, - gpr_clock_type clock_type) { - gpr_timespec time; - time.tv_nsec = (int32_t)gpr_atm_no_barrier_load(&src->tv_nsec); - time.tv_sec = (int64_t)gpr_atm_no_barrier_load(&src->tv_sec); - time.clock_type = clock_type; - return time; -} - -static void time_atm_pair_store(time_atm_pair* dst, const gpr_timespec src) { - gpr_atm_no_barrier_store(&dst->tv_sec, src.tv_sec); - gpr_atm_no_barrier_store(&dst->tv_nsec, src.tv_nsec); -} +static gpr_timespec g_start_time; void grpc_exec_ctx_global_init(void) { - for (int i = 0; i < GPR_TIMESPAN; i++) { - time_atm_pair_store(&g_start_time[i], gpr_now((gpr_clock_type)i)); - } - // allows uniform treatment in conversion functions - time_atm_pair_store(&g_start_time[GPR_TIMESPAN], gpr_time_0(GPR_TIMESPAN)); + g_start_time = gpr_now(GPR_CLOCK_MONOTONIC); } void grpc_exec_ctx_global_shutdown(void) {} static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type); - ts = gpr_time_sub(ts, start_time); + ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; if (x < 0) return 0; @@ -158,9 +122,7 @@ static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { } static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type); - ts = gpr_time_sub(ts, start_time); + ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; @@ -195,41 +157,18 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, if (clock_type == GPR_TIMESPAN) { return gpr_time_from_millis(millis, GPR_TIMESPAN); } - gpr_timespec start_time = - timespec_from_time_atm_pair(&g_start_time[clock_type], clock_type); - return gpr_time_add(start_time, gpr_time_from_millis(millis, GPR_TIMESPAN)); + return gpr_time_add(gpr_convert_clock_type(g_start_time, clock_type), + gpr_time_from_millis(millis, GPR_TIMESPAN)); } grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) { - return timespec_to_atm_round_down(ts); + return timespec_to_atm_round_down( + gpr_convert_clock_type(ts, g_start_time.clock_type)); } grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { - return timespec_to_atm_round_up(ts); -} - -void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx) { - grpc_millis now = grpc_exec_ctx_now(exec_ctx); - grpc_millis last_start_time_update = - gpr_atm_no_barrier_load(&g_last_start_time_update); - - if (now > last_start_time_update && - now - last_start_time_update > GRPC_START_TIME_UPDATE_INTERVAL) { - /* Get the current system time and subtract \a now from it, where \a now is - * the relative time from grpc_init() from monotonic clock. This calibrates - * the time when grpc_exec_ctx_global_init was called based on current - * system clock. */ - gpr_atm_no_barrier_store(&g_last_start_time_update, now); - gpr_timespec real_now = gpr_now(GPR_CLOCK_REALTIME); - gpr_timespec real_start_time = - gpr_time_sub(real_now, gpr_time_from_millis(now, GPR_TIMESPAN)); - time_atm_pair_store(&g_start_time[GPR_CLOCK_REALTIME], real_start_time); - - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "Update realtime clock start time: %" PRId64 "s %dns", - real_start_time.tv_sec, real_start_time.tv_nsec); - } - } + return timespec_to_atm_round_up( + gpr_convert_clock_type(ts, g_start_time.clock_type)); } static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 6035e08361..bd27506152 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -124,8 +124,6 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); -void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx); - #ifdef __cplusplus } #endif diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index acc40b6c9e..163c9b5211 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -226,10 +226,6 @@ static void timer_main_loop(grpc_exec_ctx* exec_ctx) { grpc_millis next = GRPC_MILLIS_INF_FUTURE; grpc_exec_ctx_invalidate_now(exec_ctx); - /* Calibrate g_start_time in exec_ctx.cc with a regular interval in case the - * system clock has changed */ - grpc_exec_ctx_maybe_update_start_time(exec_ctx); - // check timer state, updates next to the next time to run a check switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: -- cgit v1.2.3