aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/block_annotate.h8
-rw-r--r--src/core/lib/iomgr/call_combiner.cc31
-rw-r--r--src/core/lib/iomgr/call_combiner.h10
-rw-r--r--src/core/lib/iomgr/closure.cc219
-rw-r--r--src/core/lib/iomgr/closure.h250
-rw-r--r--src/core/lib/iomgr/combiner.cc17
-rw-r--r--src/core/lib/iomgr/combiner.h10
-rw-r--r--src/core/lib/iomgr/endpoint.h8
-rw-r--r--src/core/lib/iomgr/endpoint_pair.h8
-rw-r--r--src/core/lib/iomgr/error.cc17
-rw-r--r--src/core/lib/iomgr/error.h12
-rw-r--r--src/core/lib/iomgr/error_internal.h8
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc62
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.h8
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc82
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.h8
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc24
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.h8
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc12
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.h8
-rw-r--r--src/core/lib/iomgr/ev_posix.cc15
-rw-r--r--src/core/lib/iomgr/ev_posix.h10
-rw-r--r--src/core/lib/iomgr/ev_windows.cc4
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc85
-rw-r--r--src/core/lib/iomgr/exec_ctx.h10
-rw-r--r--src/core/lib/iomgr/executor.cc24
-rw-r--r--src/core/lib/iomgr/executor.h8
-rw-r--r--src/core/lib/iomgr/fork_posix.cc90
-rw-r--r--src/core/lib/iomgr/fork_windows.cc39
-rw-r--r--src/core/lib/iomgr/gethostname.h8
-rw-r--r--src/core/lib/iomgr/iocp_windows.h8
-rw-r--r--src/core/lib/iomgr/iomgr.h8
-rw-r--r--src/core/lib/iomgr/iomgr_internal.h8
-rw-r--r--src/core/lib/iomgr/iomgr_posix.cc1
-rw-r--r--src/core/lib/iomgr/iomgr_uv.cc2
-rw-r--r--src/core/lib/iomgr/iomgr_uv.h8
-rw-r--r--src/core/lib/iomgr/load_file.h8
-rw-r--r--src/core/lib/iomgr/lockfree_event.cc14
-rw-r--r--src/core/lib/iomgr/lockfree_event.h7
-rw-r--r--src/core/lib/iomgr/polling_entity.h7
-rw-r--r--src/core/lib/iomgr/pollset.h12
-rw-r--r--src/core/lib/iomgr/pollset_set.h8
-rw-r--r--src/core/lib/iomgr/pollset_uv.cc5
-rw-r--r--src/core/lib/iomgr/pollset_uv.h8
-rw-r--r--src/core/lib/iomgr/pollset_windows.cc5
-rw-r--r--src/core/lib/iomgr/pollset_windows.h8
-rw-r--r--src/core/lib/iomgr/port.h4
-rw-r--r--src/core/lib/iomgr/resolve_address.h8
-rw-r--r--src/core/lib/iomgr/resource_quota.cc28
-rw-r--r--src/core/lib/iomgr/resource_quota.h10
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.cc2
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.h8
-rw-r--r--src/core/lib/iomgr/socket_factory_posix.h8
-rw-r--r--src/core/lib/iomgr/socket_mutator.h8
-rw-r--r--src/core/lib/iomgr/socket_utils.h8
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h8
-rw-r--r--src/core/lib/iomgr/socket_windows.h8
-rw-r--r--src/core/lib/iomgr/tcp_client.h8
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc10
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.h8
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.cc8
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc93
-rw-r--r--src/core/lib/iomgr/tcp_posix.h10
-rw-r--r--src/core/lib/iomgr/tcp_server.h8
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix.h8
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc2
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.cc108
-rw-r--r--src/core/lib/iomgr/tcp_uv.cc137
-rw-r--r--src/core/lib/iomgr/tcp_uv.h10
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc6
-rw-r--r--src/core/lib/iomgr/tcp_windows.h8
-rw-r--r--src/core/lib/iomgr/time_averaged_stats.h8
-rw-r--r--src/core/lib/iomgr/timer.h8
-rw-r--r--src/core/lib/iomgr/timer_generic.cc39
-rw-r--r--src/core/lib/iomgr/timer_heap.h8
-rw-r--r--src/core/lib/iomgr/timer_manager.cc31
-rw-r--r--src/core/lib/iomgr/timer_manager.h8
-rw-r--r--src/core/lib/iomgr/timer_uv.cc7
-rw-r--r--src/core/lib/iomgr/udp_server.cc110
-rw-r--r--src/core/lib/iomgr/udp_server.h19
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.h8
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h8
-rw-r--r--src/core/lib/iomgr/wakeup_fd_pipe.h8
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.h8
86 files changed, 868 insertions, 1150 deletions
diff --git a/src/core/lib/iomgr/block_annotate.h b/src/core/lib/iomgr/block_annotate.h
index fcbfe9eb1a..340ebcb1af 100644
--- a/src/core/lib/iomgr/block_annotate.h
+++ b/src/core/lib/iomgr/block_annotate.h
@@ -19,17 +19,9 @@
#ifndef GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H
#define GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H
-#ifdef __cplusplus
-extern "C" {
-#endif
-
void gpr_thd_start_blocking_region();
void gpr_thd_end_blocking_region();
-#ifdef __cplusplus
-}
-#endif
-
/* These annotations identify the beginning and end of regions where
the code may block for reasons other than synchronization functions.
These include poll, epoll, and getaddrinfo. */
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc
index 74b077de06..b5910b42e4 100644
--- a/src/core/lib/iomgr/call_combiner.cc
+++ b/src/core/lib/iomgr/call_combiner.cc
@@ -24,8 +24,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/profiling/timers.h"
-grpc_tracer_flag grpc_call_combiner_trace =
- GRPC_TRACER_INITIALIZER(false, "call_combiner");
+grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner");
static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
if (cancel_state & 1) {
@@ -63,7 +62,7 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
grpc_error* error DEBUG_ARGS,
const char* reason) {
GPR_TIMER_BEGIN("call_combiner_start", 0);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG,
"==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
"%s] error=%s",
@@ -72,7 +71,7 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
}
size_t prev_size =
(size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
prev_size + 1);
}
@@ -80,13 +79,13 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
if (prev_size == 0) {
GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx);
GPR_TIMER_MARK("call_combiner_initiate", 0);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
}
// Queue was empty, so execute this closure immediately.
GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
} else {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO, " QUEUING");
}
// Queue was not empty, so add closure to queue.
@@ -100,21 +99,21 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner DEBUG_ARGS,
const char* reason) {
GPR_TIMER_BEGIN("call_combiner_stop", 0);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG,
"==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
call_combiner DEBUG_FMT_ARGS, reason);
}
size_t prev_size =
(size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
prev_size - 1);
}
GPR_ASSERT(prev_size >= 1);
if (prev_size > 1) {
while (true) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG, " checking queue");
}
bool empty;
@@ -123,19 +122,19 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
if (closure == nullptr) {
// This can happen either due to a race condition within the mpscq
// code or because of a race with grpc_call_combiner_start().
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG, " queue returned no result; checking again");
}
continue;
}
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s",
closure, grpc_error_string(closure->error_data.error));
}
GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error);
break;
}
- } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ } else if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG, " queue empty");
}
GPR_TIMER_END("call_combiner_stop", 0);
@@ -152,7 +151,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
// If error is set, invoke the cancellation closure immediately.
// Otherwise, store the new closure.
if (original_error != GRPC_ERROR_NONE) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG,
"call_combiner=%p: scheduling notify_on_cancel callback=%p "
"for pre-existing cancellation",
@@ -163,7 +162,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
} else {
if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
(gpr_atm)closure)) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG, "call_combiner=%p: setting notify_on_cancel=%p",
call_combiner, closure);
}
@@ -172,7 +171,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
// up any resources they may be holding for the callback.
if (original_state != 0) {
closure = (grpc_closure*)original_state;
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG,
"call_combiner=%p: scheduling old cancel callback=%p",
call_combiner, closure);
@@ -201,7 +200,7 @@ void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
encode_cancel_state_error(error))) {
if (original_state != 0) {
grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_DEBUG,
"call_combiner=%p: scheduling notify_on_cancel callback=%p",
call_combiner, notify_on_cancel);
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 527f84fce0..c07af51c91 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -27,10 +27,6 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/mpscq.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
// A simple, lock-free mechanism for serializing activity related to a
// single call. This is similar to a combiner but is more lightweight.
//
@@ -40,7 +36,7 @@ extern "C" {
// when it is done with the action that was kicked off by the original
// callback.
-extern grpc_tracer_flag grpc_call_combiner_trace;
+extern grpc_core::TraceFlag grpc_call_combiner_trace;
typedef struct {
gpr_atm size; // size_t, num closures in queue or currently executing
@@ -122,8 +118,4 @@ void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_error* error);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/closure.cc b/src/core/lib/iomgr/closure.cc
deleted file mode 100644
index 09257d258c..0000000000
--- a/src/core/lib/iomgr/closure.cc
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/iomgr/closure.h"
-
-#include <assert.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "src/core/lib/profiling/timers.h"
-
-#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false, "closure");
-#endif
-
-#ifndef NDEBUG
-grpc_closure* grpc_closure_init(const char* file, int line,
- grpc_closure* closure, grpc_iomgr_cb_func cb,
- void* cb_arg,
- grpc_closure_scheduler* scheduler) {
-#else
-grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_iomgr_cb_func cb,
- void* cb_arg,
- grpc_closure_scheduler* scheduler) {
-#endif
- closure->cb = cb;
- closure->cb_arg = cb_arg;
- closure->scheduler = scheduler;
-#ifndef NDEBUG
- closure->scheduled = false;
- closure->file_initiated = nullptr;
- closure->line_initiated = 0;
- closure->run = false;
- closure->file_created = file;
- closure->line_created = line;
-#endif
- return closure;
-}
-
-void grpc_closure_list_init(grpc_closure_list* closure_list) {
- closure_list->head = closure_list->tail = nullptr;
-}
-
-bool grpc_closure_list_append(grpc_closure_list* closure_list,
- grpc_closure* closure, grpc_error* error) {
- if (closure == nullptr) {
- GRPC_ERROR_UNREF(error);
- return false;
- }
- closure->error_data.error = error;
- closure->next_data.next = nullptr;
- bool was_empty = (closure_list->head == nullptr);
- if (was_empty) {
- closure_list->head = closure;
- } else {
- closure_list->tail->next_data.next = closure;
- }
- closure_list->tail = closure;
- return was_empty;
-}
-
-void grpc_closure_list_fail_all(grpc_closure_list* list,
- grpc_error* forced_failure) {
- for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) {
- if (c->error_data.error == GRPC_ERROR_NONE) {
- c->error_data.error = GRPC_ERROR_REF(forced_failure);
- }
- }
- GRPC_ERROR_UNREF(forced_failure);
-}
-
-bool grpc_closure_list_empty(grpc_closure_list closure_list) {
- return closure_list.head == nullptr;
-}
-
-void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst) {
- if (src->head == nullptr) {
- return;
- }
- if (dst->head == nullptr) {
- *dst = *src;
- } else {
- dst->tail->next_data.next = src->head;
- dst->tail = src->tail;
- }
- src->head = src->tail = nullptr;
-}
-
-typedef struct {
- grpc_iomgr_cb_func cb;
- void* cb_arg;
- grpc_closure wrapper;
-} wrapped_closure;
-
-static void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- wrapped_closure* wc = (wrapped_closure*)arg;
- grpc_iomgr_cb_func cb = wc->cb;
- void* cb_arg = wc->cb_arg;
- gpr_free(wc);
- cb(exec_ctx, cb_arg, error);
-}
-
-#ifndef NDEBUG
-grpc_closure* grpc_closure_create(const char* file, int line,
- grpc_iomgr_cb_func cb, void* cb_arg,
- grpc_closure_scheduler* scheduler) {
-#else
-grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg,
- grpc_closure_scheduler* scheduler) {
-#endif
- wrapped_closure* wc = (wrapped_closure*)gpr_malloc(sizeof(*wc));
- wc->cb = cb;
- wc->cb_arg = cb_arg;
-#ifndef NDEBUG
- grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler);
-#else
- grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler);
-#endif
- return &wc->wrapper;
-}
-
-#ifndef NDEBUG
-void grpc_closure_run(const char* file, int line, grpc_exec_ctx* exec_ctx,
- grpc_closure* c, grpc_error* error) {
-#else
-void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c,
- grpc_error* error) {
-#endif
- GPR_TIMER_BEGIN("grpc_closure_run", 0);
- if (c != nullptr) {
-#ifndef NDEBUG
- c->file_initiated = file;
- c->line_initiated = line;
- c->run = true;
-#endif
- assert(c->cb);
- c->scheduler->vtable->run(exec_ctx, c, error);
- } else {
- GRPC_ERROR_UNREF(error);
- }
- GPR_TIMER_END("grpc_closure_run", 0);
-}
-
-#ifndef NDEBUG
-void grpc_closure_sched(const char* file, int line, grpc_exec_ctx* exec_ctx,
- grpc_closure* c, grpc_error* error) {
-#else
-void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c,
- grpc_error* error) {
-#endif
- GPR_TIMER_BEGIN("grpc_closure_sched", 0);
- if (c != nullptr) {
-#ifndef NDEBUG
- if (c->scheduled) {
- gpr_log(GPR_ERROR,
- "Closure already scheduled. (closure: %p, created: [%s:%d], "
- "previously scheduled at: [%s: %d] run?: %s",
- c, c->file_created, c->line_created, c->file_initiated,
- c->line_initiated, c->run ? "true" : "false");
- abort();
- }
- c->scheduled = true;
- c->file_initiated = file;
- c->line_initiated = line;
- c->run = false;
-#endif
- assert(c->cb);
- c->scheduler->vtable->sched(exec_ctx, c, error);
- } else {
- GRPC_ERROR_UNREF(error);
- }
- GPR_TIMER_END("grpc_closure_sched", 0);
-}
-
-#ifndef NDEBUG
-void grpc_closure_list_sched(const char* file, int line,
- grpc_exec_ctx* exec_ctx, grpc_closure_list* list) {
-#else
-void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx, grpc_closure_list* list) {
-#endif
- grpc_closure* c = list->head;
- while (c != nullptr) {
- grpc_closure* next = c->next_data.next;
-#ifndef NDEBUG
- if (c->scheduled) {
- gpr_log(GPR_ERROR,
- "Closure already scheduled. (closure: %p, created: [%s:%d], "
- "previously scheduled at: [%s: %d] run?: %s",
- c, c->file_created, c->line_created, c->file_initiated,
- c->line_initiated, c->run ? "true" : "false");
- abort();
- }
- c->scheduled = true;
- c->file_initiated = file;
- c->line_initiated = line;
- c->run = false;
-#endif
- assert(c->cb);
- c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error);
- c = next;
- }
- list->head = list->tail = nullptr;
-}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 8b1188e2db..46793dd2c5 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -21,21 +21,19 @@
#include <grpc/support/port_platform.h>
+#include <assert.h>
#include <grpc/impl/codegen/exec_ctx_fwd.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include <stdbool.h>
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/mpscq.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
-#ifndef NDEBUG
-extern grpc_tracer_flag grpc_trace_closure;
-#endif
+extern grpc_core::DebugOnlyTraceFlag grpc_trace_closure;
typedef struct grpc_closure_list {
grpc_closure* head;
@@ -85,8 +83,8 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */
void* cb_arg;
- /** Scheduler to schedule against: NULL to schedule against current execution
- context */
+ /** Scheduler to schedule against: nullptr to schedule against current
+ execution context */
grpc_closure_scheduler* scheduler;
/** Once queued, the result of the closure. Before then: scratch space */
@@ -107,102 +105,262 @@ struct grpc_closure {
#endif
};
+#ifndef NDEBUG
+inline grpc_closure* grpc_closure_init(const char* file, int line,
+ grpc_closure* closure,
+ grpc_iomgr_cb_func cb, void* cb_arg,
+ grpc_closure_scheduler* scheduler) {
+#else
+inline grpc_closure* grpc_closure_init(grpc_closure* closure,
+ grpc_iomgr_cb_func cb, void* cb_arg,
+ grpc_closure_scheduler* scheduler) {
+#endif
+ closure->cb = cb;
+ closure->cb_arg = cb_arg;
+ closure->scheduler = scheduler;
+#ifndef NDEBUG
+ closure->scheduled = false;
+ closure->file_initiated = nullptr;
+ closure->line_initiated = 0;
+ closure->run = false;
+ closure->file_created = file;
+ closure->line_created = line;
+#endif
+ return closure;
+}
+
/** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */
#ifndef NDEBUG
-grpc_closure* grpc_closure_init(const char* file, int line,
- grpc_closure* closure, grpc_iomgr_cb_func cb,
- void* cb_arg,
- grpc_closure_scheduler* scheduler);
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler)
#else
-grpc_closure* grpc_closure_init(grpc_closure* closure, grpc_iomgr_cb_func cb,
- void* cb_arg,
- grpc_closure_scheduler* scheduler);
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
grpc_closure_init(closure, cb, cb_arg, scheduler)
#endif
+namespace closure_impl {
+
+typedef struct {
+ grpc_iomgr_cb_func cb;
+ void* cb_arg;
+ grpc_closure wrapper;
+} wrapped_closure;
+
+inline void closure_wrapper(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ wrapped_closure* wc = (wrapped_closure*)arg;
+ grpc_iomgr_cb_func cb = wc->cb;
+ void* cb_arg = wc->cb_arg;
+ gpr_free(wc);
+ cb(exec_ctx, cb_arg, error);
+}
+
+} // namespace closure_impl
+
+#ifndef NDEBUG
+inline grpc_closure* grpc_closure_create(const char* file, int line,
+ grpc_iomgr_cb_func cb, void* cb_arg,
+ grpc_closure_scheduler* scheduler) {
+#else
+inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg,
+ grpc_closure_scheduler* scheduler) {
+#endif
+ closure_impl::wrapped_closure* wc =
+ (closure_impl::wrapped_closure*)gpr_malloc(sizeof(*wc));
+ wc->cb = cb;
+ wc->cb_arg = cb_arg;
+#ifndef NDEBUG
+ grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper, wc,
+ scheduler);
+#else
+ grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc, scheduler);
+#endif
+ return &wc->wrapper;
+}
+
/* Create a heap allocated closure: try to avoid except for very rare events */
#ifndef NDEBUG
-grpc_closure* grpc_closure_create(const char* file, int line,
- grpc_iomgr_cb_func cb, void* cb_arg,
- grpc_closure_scheduler* scheduler);
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler)
#else
-grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg,
- grpc_closure_scheduler* scheduler);
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
grpc_closure_create(cb, cb_arg, scheduler)
#endif
#define GRPC_CLOSURE_LIST_INIT \
- { NULL, NULL }
+ { nullptr, nullptr }
-void grpc_closure_list_init(grpc_closure_list* list);
+inline void grpc_closure_list_init(grpc_closure_list* closure_list) {
+ closure_list->head = closure_list->tail = nullptr;
+}
/** add \a closure to the end of \a list
and set \a closure's result to \a error
Returns true if \a list becomes non-empty */
-bool grpc_closure_list_append(grpc_closure_list* list, grpc_closure* closure,
- grpc_error* error);
+inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
+ grpc_closure* closure, grpc_error* error) {
+ if (closure == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return false;
+ }
+ closure->error_data.error = error;
+ closure->next_data.next = nullptr;
+ bool was_empty = (closure_list->head == nullptr);
+ if (was_empty) {
+ closure_list->head = closure;
+ } else {
+ closure_list->tail->next_data.next = closure;
+ }
+ closure_list->tail = closure;
+ return was_empty;
+}
/** force all success bits in \a list to false */
-void grpc_closure_list_fail_all(grpc_closure_list* list,
- grpc_error* forced_failure);
+inline void grpc_closure_list_fail_all(grpc_closure_list* list,
+ grpc_error* forced_failure) {
+ for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) {
+ if (c->error_data.error == GRPC_ERROR_NONE) {
+ c->error_data.error = GRPC_ERROR_REF(forced_failure);
+ }
+ }
+ GRPC_ERROR_UNREF(forced_failure);
+}
/** append all closures from \a src to \a dst and empty \a src. */
-void grpc_closure_list_move(grpc_closure_list* src, grpc_closure_list* dst);
+inline void grpc_closure_list_move(grpc_closure_list* src,
+ grpc_closure_list* dst) {
+ if (src->head == nullptr) {
+ return;
+ }
+ if (dst->head == nullptr) {
+ *dst = *src;
+ } else {
+ dst->tail->next_data.next = src->head;
+ dst->tail = src->tail;
+ }
+ src->head = src->tail = nullptr;
+}
/** return whether \a list is empty. */
-bool grpc_closure_list_empty(grpc_closure_list list);
+inline bool grpc_closure_list_empty(grpc_closure_list closure_list) {
+ return closure_list.head == nullptr;
+}
+
+#ifndef NDEBUG
+inline void grpc_closure_run(const char* file, int line,
+ grpc_exec_ctx* exec_ctx, grpc_closure* c,
+ grpc_error* error) {
+#else
+inline void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* c,
+ grpc_error* error) {
+#endif
+ GPR_TIMER_BEGIN("grpc_closure_run", 0);
+ if (c != nullptr) {
+#ifndef NDEBUG
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = true;
+#endif
+ assert(c->cb);
+ c->scheduler->vtable->run(exec_ctx, c, error);
+ } else {
+ GRPC_ERROR_UNREF(error);
+ }
+ GPR_TIMER_END("grpc_closure_run", 0);
+}
/** Run a closure directly. Caller ensures that no locks are being held above.
* Note that calling this at the end of a closure callback function itself is
* by definition safe. */
#ifndef NDEBUG
-void grpc_closure_run(const char* file, int line, grpc_exec_ctx* exec_ctx,
- grpc_closure* closure, grpc_error* error);
#define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \
grpc_closure_run(__FILE__, __LINE__, exec_ctx, closure, error)
#else
-void grpc_closure_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
- grpc_error* error);
#define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \
grpc_closure_run(exec_ctx, closure, error)
#endif
+#ifndef NDEBUG
+inline void grpc_closure_sched(const char* file, int line,
+ grpc_exec_ctx* exec_ctx, grpc_closure* c,
+ grpc_error* error) {
+#else
+inline void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* c,
+ grpc_error* error) {
+#endif
+ GPR_TIMER_BEGIN("grpc_closure_sched", 0);
+ if (c != nullptr) {
+#ifndef NDEBUG
+ if (c->scheduled) {
+ gpr_log(GPR_ERROR,
+ "Closure already scheduled. (closure: %p, created: [%s:%d], "
+ "previously scheduled at: [%s: %d] run?: %s",
+ c, c->file_created, c->line_created, c->file_initiated,
+ c->line_initiated, c->run ? "true" : "false");
+ abort();
+ }
+ c->scheduled = true;
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = false;
+#endif
+ assert(c->cb);
+ c->scheduler->vtable->sched(exec_ctx, c, error);
+ } else {
+ GRPC_ERROR_UNREF(error);
+ }
+ GPR_TIMER_END("grpc_closure_sched", 0);
+}
+
/** Schedule a closure to be run. Does not need to be run from a safe point. */
#ifndef NDEBUG
-void grpc_closure_sched(const char* file, int line, grpc_exec_ctx* exec_ctx,
- grpc_closure* closure, grpc_error* error);
#define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \
grpc_closure_sched(__FILE__, __LINE__, exec_ctx, closure, error)
#else
-void grpc_closure_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
- grpc_error* error);
#define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \
grpc_closure_sched(exec_ctx, closure, error)
#endif
+#ifndef NDEBUG
+inline void grpc_closure_list_sched(const char* file, int line,
+ grpc_exec_ctx* exec_ctx,
+ grpc_closure_list* list) {
+#else
+inline void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx,
+ grpc_closure_list* list) {
+#endif
+ grpc_closure* c = list->head;
+ while (c != nullptr) {
+ grpc_closure* next = c->next_data.next;
+#ifndef NDEBUG
+ if (c->scheduled) {
+ gpr_log(GPR_ERROR,
+ "Closure already scheduled. (closure: %p, created: [%s:%d], "
+ "previously scheduled at: [%s: %d] run?: %s",
+ c, c->file_created, c->line_created, c->file_initiated,
+ c->line_initiated, c->run ? "true" : "false");
+ abort();
+ }
+ c->scheduled = true;
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = false;
+#endif
+ assert(c->cb);
+ c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error);
+ c = next;
+ }
+ list->head = list->tail = nullptr;
+}
+
/** Schedule all closures in a list to be run. Does not need to be run from a
* safe point. */
#ifndef NDEBUG
-void grpc_closure_list_sched(const char* file, int line,
- grpc_exec_ctx* exec_ctx,
- grpc_closure_list* closure_list);
#define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \
grpc_closure_list_sched(__FILE__, __LINE__, exec_ctx, closure_list)
#else
-void grpc_closure_list_sched(grpc_exec_ctx* exec_ctx,
- grpc_closure_list* closure_list);
#define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \
grpc_closure_list_sched(exec_ctx, closure_list)
#endif
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index b28ca3464c..15c009dd77 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -29,14 +29,13 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
-grpc_tracer_flag grpc_combiner_trace =
- GRPC_TRACER_INITIALIZER(false, "combiner");
-
-#define GRPC_COMBINER_TRACE(fn) \
- do { \
- if (GRPC_TRACER_ON(grpc_combiner_trace)) { \
- fn; \
- } \
+grpc_core::TraceFlag grpc_combiner_trace(false, "combiner");
+
+#define GRPC_COMBINER_TRACE(fn) \
+ do { \
+ if (grpc_combiner_trace.enabled()) { \
+ fn; \
+ } \
} while (0)
#define STATE_UNORPHANED 1
@@ -106,7 +105,7 @@ static void start_destroy(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) {
#ifndef NDEBUG
#define GRPC_COMBINER_DEBUG_SPAM(op, delta) \
- if (GRPC_TRACER_ON(grpc_combiner_trace)) { \
+ if (grpc_combiner_trace.enabled()) { \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, \
"C:%p %s %" PRIdPTR " --> %" PRIdPTR " %s", lock, (op), \
gpr_atm_no_barrier_load(&lock->refs.count), \
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index f8a8b9df62..0c05511331 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -26,10 +26,6 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/mpscq.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
// Provides serialized access to some resource.
// Each action queued on a combiner is executed serially in a borrowed thread.
// The actual thread executing actions may change over time (but there will only
@@ -65,10 +61,6 @@ grpc_closure_scheduler* grpc_combiner_finally_scheduler(grpc_combiner* lock);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx* exec_ctx);
-extern grpc_tracer_flag grpc_combiner_trace;
-
-#ifdef __cplusplus
-}
-#endif
+extern grpc_core::TraceFlag grpc_combiner_trace;
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 1b0a9e725e..6ab0a6591c 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -26,10 +26,6 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resource_quota.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* An endpoint caps a streaming channel between two communicating processes.
Examples may be: a tcp socket, <stdin+stdout>, or some shared memory. */
@@ -106,8 +102,4 @@ struct grpc_endpoint {
const grpc_endpoint_vtable* vtable;
};
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_H */
diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h
index 219eea8550..506ffc88b4 100644
--- a/src/core/lib/iomgr/endpoint_pair.h
+++ b/src/core/lib/iomgr/endpoint_pair.h
@@ -21,10 +21,6 @@
#include "src/core/lib/iomgr/endpoint.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef struct {
grpc_endpoint* client;
grpc_endpoint* server;
@@ -33,8 +29,4 @@ typedef struct {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_channel_args* args);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc
index 581b903f1a..e6d640c106 100644
--- a/src/core/lib/iomgr/error.cc
+++ b/src/core/lib/iomgr/error.cc
@@ -37,10 +37,9 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
-#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_error_refcount =
- GRPC_TRACER_INITIALIZER(false, "error_refcount");
-#endif
+grpc_core::DebugOnlyTraceFlag grpc_trace_error_refcount(false,
+ "error_refcount");
+grpc_core::DebugOnlyTraceFlag grpc_trace_closure(false, "closure");
static const char* error_int_name(grpc_error_ints key) {
switch (key) {
@@ -130,7 +129,7 @@ bool grpc_error_is_special(grpc_error* err) {
#ifndef NDEBUG
grpc_error* grpc_error_ref(grpc_error* err, const char* file, int line) {
if (grpc_error_is_special(err)) return err;
- if (GRPC_TRACER_ON(grpc_trace_error_refcount)) {
+ if (grpc_trace_error_refcount.enabled()) {
gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d]", err,
gpr_atm_no_barrier_load(&err->atomics.refs.count),
gpr_atm_no_barrier_load(&err->atomics.refs.count) + 1, file, line);
@@ -183,7 +182,7 @@ static void error_destroy(grpc_error* err) {
#ifndef NDEBUG
void grpc_error_unref(grpc_error* err, const char* file, int line) {
if (grpc_error_is_special(err)) return;
- if (GRPC_TRACER_ON(grpc_trace_error_refcount)) {
+ if (grpc_trace_error_refcount.enabled()) {
gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d]", err,
gpr_atm_no_barrier_load(&err->atomics.refs.count),
gpr_atm_no_barrier_load(&err->atomics.refs.count) - 1, file, line);
@@ -216,7 +215,7 @@ static uint8_t get_placement(grpc_error** err, size_t size) {
*err = (grpc_error*)gpr_realloc(
*err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t));
#ifndef NDEBUG
- if (GRPC_TRACER_ON(grpc_trace_error_refcount)) {
+ if (grpc_trace_error_refcount.enabled()) {
if (*err != orig) {
gpr_log(GPR_DEBUG, "realloc %p -> %p", orig, *err);
}
@@ -329,7 +328,7 @@ grpc_error* grpc_error_create(const char* file, int line, grpc_slice desc,
return GRPC_ERROR_OOM;
}
#ifndef NDEBUG
- if (GRPC_TRACER_ON(grpc_trace_error_refcount)) {
+ if (grpc_trace_error_refcount.enabled()) {
gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line);
}
#endif
@@ -411,7 +410,7 @@ static grpc_error* copy_error_and_unref(grpc_error* in) {
out = (grpc_error*)gpr_malloc(sizeof(*in) +
new_arena_capacity * sizeof(intptr_t));
#ifndef NDEBUG
- if (GRPC_TRACER_ON(grpc_trace_error_refcount)) {
+ if (grpc_trace_error_refcount.enabled()) {
gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
}
#endif
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 8d7aea4872..4759ee0791 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -29,19 +29,13 @@
#include "src/core/lib/debug/trace.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/// Opaque representation of an error.
/// See https://github.com/grpc/grpc/blob/master/doc/core/grpc-error.md for a
/// full write up of this object.
typedef struct grpc_error grpc_error;
-#ifndef NDEBUG
-extern grpc_tracer_flag grpc_trace_error_refcount;
-#endif
+extern grpc_core::DebugOnlyTraceFlag grpc_trace_error_refcount;
typedef enum {
/// 'errno' from the operating system
@@ -205,8 +199,4 @@ bool grpc_log_if_error(const char* what, grpc_error* error, const char* file,
#define GRPC_LOG_IF_ERROR(what, error) \
grpc_log_if_error((what), (error), __FILE__, __LINE__)
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */
diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h
index d5ccbae9e7..6cb09c2cdb 100644
--- a/src/core/lib/iomgr/error_internal.h
+++ b/src/core/lib/iomgr/error_internal.h
@@ -25,10 +25,6 @@
#include <grpc/support/sync.h>
#include "src/core/lib/iomgr/error.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef struct grpc_linked_error grpc_linked_error;
struct grpc_linked_error {
@@ -62,8 +58,4 @@ struct grpc_error {
bool grpc_error_is_special(struct grpc_error* err);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 918bc6f933..0dda1d924c 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -263,11 +263,13 @@ static grpc_fd* fd_create(int fd, const char* name) {
if (new_fd == nullptr) {
new_fd = (grpc_fd*)gpr_malloc(sizeof(grpc_fd));
+ new_fd->read_closure.Init();
+ new_fd->write_closure.Init();
}
new_fd->fd = fd;
- new_fd->read_closure.Init();
- new_fd->write_closure.Init();
+ new_fd->read_closure->InitEvent();
+ new_fd->write_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -276,7 +278,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
#ifndef NDEBUG
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
+ if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
}
#endif
@@ -336,8 +338,8 @@ static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
- fd->read_closure.Destroy();
- fd->write_closure.Destroy();
+ fd->read_closure->DestroyEvent();
+ fd->write_closure->DestroyEvent();
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@@ -651,7 +653,7 @@ static grpc_error* do_epoll_wait(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
GRPC_STATS_INC_POLL_EVENTS_RETURNED(exec_ctx, r);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
}
@@ -673,7 +675,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
pollset->begin_refs++;
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
}
@@ -692,7 +694,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
retry_lock_neighborhood:
gpr_mu_lock(&neighborhood->mu);
gpr_mu_lock(&pollset->mu);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
pollset, worker, kick_state_string(worker->state),
is_reassigning);
@@ -744,7 +746,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
while (worker->state == UNKICKED && !pollset->shutting_down) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
pollset, worker, kick_state_string(worker->state),
pollset->shutting_down);
@@ -761,7 +763,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
grpc_exec_ctx_invalidate_now(exec_ctx);
}
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR,
"PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
"kicked_without_poller: %d",
@@ -806,7 +808,7 @@ static bool check_neighborhood_for_available_poller(
case UNKICKED:
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
inspect_worker);
}
@@ -817,7 +819,7 @@ static bool check_neighborhood_for_available_poller(
gpr_cv_signal(&inspect_worker->cv);
}
} else {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
}
}
@@ -835,7 +837,7 @@ static bool check_neighborhood_for_available_poller(
} while (!found_worker && inspect_worker != inspect->root_worker);
}
if (!found_worker) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
}
inspect->seen_inactive = true;
@@ -857,7 +859,7 @@ static void end_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
grpc_pollset_worker* worker,
grpc_pollset_worker** worker_hdl) {
GPR_TIMER_BEGIN("end_worker", 0);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
}
if (worker_hdl != nullptr) *worker_hdl = nullptr;
@@ -867,7 +869,7 @@ static void end_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
&exec_ctx->closure_list);
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
if (worker->next != worker && worker->next->state == UNKICKED) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
}
GPR_ASSERT(worker->next->initialized_cv);
@@ -921,7 +923,7 @@ static void end_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. remove worker");
}
if (EMPTIED == worker_remove(pollset, worker)) {
@@ -993,7 +995,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
GPR_TIMER_BEGIN("pollset_kick", 0);
GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
grpc_error* ret_err = GRPC_ERROR_NONE;
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_strvec log;
gpr_strvec_init(&log);
char* tmp;
@@ -1026,7 +1028,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
if (root_worker == nullptr) {
GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx);
pollset->kicked_without_poller = true;
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. kicked_without_poller");
}
goto done;
@@ -1034,14 +1036,14 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
grpc_pollset_worker* next_worker = root_worker->next;
if (root_worker->state == KICKED) {
GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
goto done;
} else if (next_worker->state == KICKED) {
GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
}
SET_KICK_STATE(next_worker, KICKED);
@@ -1052,7 +1054,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
root_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load(
&g_active_poller)) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
@@ -1060,7 +1062,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
goto done;
} else if (next_worker->state == UNKICKED) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
}
GPR_ASSERT(next_worker->initialized_cv);
@@ -1069,7 +1071,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
goto done;
} else if (next_worker->state == DESIGNATED_POLLER) {
if (root_worker->state != DESIGNATED_POLLER) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(
GPR_ERROR,
" .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
@@ -1083,7 +1085,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
goto done;
} else {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
root_worker);
}
@@ -1099,7 +1101,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
}
} else {
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. kicked while waking up");
}
goto done;
@@ -1109,14 +1111,14 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
}
if (specific_worker->state == KICKED) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. specific worker already kicked");
}
goto done;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
}
SET_KICK_STATE(specific_worker, KICKED);
@@ -1124,7 +1126,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
} else if (specific_worker ==
(grpc_pollset_worker*)gpr_atm_no_barrier_load(&g_active_poller)) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. kick active poller");
}
SET_KICK_STATE(specific_worker, KICKED);
@@ -1132,7 +1134,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
goto done;
} else if (specific_worker->initialized_cv) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. kick waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
@@ -1140,7 +1142,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
goto done;
} else {
GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, " .. kick non-waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.h b/src/core/lib/iomgr/ev_epoll1_linux.h
index 3e66747f6c..9a1b96bd45 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.h
+++ b/src/core/lib/iomgr/ev_epoll1_linux.h
@@ -22,16 +22,8 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/port.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
// a polling engine that utilizes a singleton epoll set and turnstile polling
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index bfd2ac4326..62643df697 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -59,10 +59,8 @@
#define MAX_EPOLL_EVENTS 100
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
-#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_pollable_refcount =
- GRPC_TRACER_INITIALIZER(false, "pollable_refcount");
-#endif
+grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
+ "pollable_refcount");
/*******************************************************************************
* pollable Declarations
@@ -263,7 +261,7 @@ static gpr_mu fd_freelist_mu;
unref_by(ec, fd, n, reason, __FILE__, __LINE__)
static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
int line) {
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
+ if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG,
"FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
@@ -288,8 +286,8 @@ static void fd_destroy(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
fd->freelist_next = fd_freelist;
fd_freelist = fd;
- fd->read_closure.Destroy();
- fd->write_closure.Destroy();
+ fd->read_closure->DestroyEvent();
+ fd->write_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
}
@@ -297,7 +295,7 @@ static void fd_destroy(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
#ifndef NDEBUG
static void unref_by(grpc_exec_ctx* exec_ctx, grpc_fd* fd, int n,
const char* reason, const char* file, int line) {
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
+ if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG,
"FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
@@ -342,6 +340,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
if (new_fd == nullptr) {
new_fd = (grpc_fd*)gpr_malloc(sizeof(grpc_fd));
+ new_fd->read_closure.Init();
+ new_fd->write_closure.Init();
}
gpr_mu_init(&new_fd->pollable_mu);
@@ -349,8 +349,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = nullptr;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
- new_fd->read_closure.Init();
- new_fd->write_closure.Init();
+ new_fd->read_closure->InitEvent();
+ new_fd->write_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -360,7 +360,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
#ifndef NDEBUG
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
+ if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
}
#endif
@@ -483,7 +483,7 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
static pollable* pollable_ref(pollable* p) {
#else
static pollable* pollable_ref(pollable* p, int line, const char* reason) {
- if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) {
+ if (grpc_trace_pollable_refcount.enabled()) {
int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
"POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason);
@@ -498,7 +498,7 @@ static void pollable_unref(pollable* p) {
#else
static void pollable_unref(pollable* p, int line, const char* reason) {
if (p == nullptr) return;
- if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) {
+ if (grpc_trace_pollable_refcount.enabled()) {
int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
"POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason);
@@ -516,7 +516,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
static const char* err_desc = "pollable_add_fd";
const int epfd = p->epfd;
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
@@ -558,7 +558,7 @@ static void pollset_global_shutdown(void) {
/* pollset->mu must be held while calling this function */
static void pollset_maybe_finish_shutdown(grpc_exec_ctx* exec_ctx,
grpc_pollset* pollset) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG,
"PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) "
"rw=%p (target:NULL) cpsc=%d (target:0)",
@@ -581,14 +581,14 @@ static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx,
grpc_core::mu_guard lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
}
GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
return GRPC_ERROR_NONE;
}
if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
}
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
@@ -597,7 +597,7 @@ static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx,
}
if (specific_worker == p->root_worker) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
}
specific_worker->kicked = true;
@@ -606,7 +606,7 @@ static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx,
}
if (specific_worker->initialized_cv) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
}
specific_worker->kicked = true;
@@ -621,7 +621,7 @@ static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx,
static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
grpc_pollset_worker* specific_worker) {
GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG,
"PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p",
pollset, specific_worker,
@@ -631,7 +631,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
if (specific_worker == nullptr) {
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
if (pollset->root_worker == nullptr) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset);
}
GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx);
@@ -657,7 +657,7 @@ static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next);
}
} else {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset);
}
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
@@ -765,7 +765,7 @@ static grpc_error* pollable_process_events(grpc_exec_ctx* exec_ctx,
struct epoll_event* ev = &pollable_obj->events[n];
void* data_ptr = ev->data.ptr;
if (1 & (intptr_t)data_ptr) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr);
}
append_error(&error,
@@ -777,7 +777,7 @@ static grpc_error* pollable_process_events(grpc_exec_ctx* exec_ctx,
bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG,
"PS:%p got fd %p: cancel=%d read=%d "
"write=%d",
@@ -805,7 +805,7 @@ static grpc_error* pollable_epoll(grpc_exec_ctx* exec_ctx, pollable* p,
grpc_millis deadline) {
int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
char* desc = pollable_desc(p);
gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout);
gpr_free(desc);
@@ -825,7 +825,7 @@ static grpc_error* pollable_epoll(grpc_exec_ctx* exec_ctx, pollable* p,
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r);
}
@@ -893,7 +893,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
gpr_mu_unlock(&pollset->mu);
- if (GRPC_TRACER_ON(grpc_polling_trace) &&
+ if (grpc_polling_trace.enabled() &&
worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
worker->pollable_obj, worker,
@@ -902,18 +902,18 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
while (do_poll && worker->pollable_obj->root_worker != worker) {
if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
worker->pollable_obj, worker);
}
do_poll = false;
} else if (worker->kicked) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset,
worker->pollable_obj, worker);
}
do_poll = false;
- } else if (GRPC_TRACER_ON(grpc_polling_trace) &&
+ } else if (grpc_polling_trace.enabled() &&
worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset,
worker->pollable_obj, worker);
@@ -984,7 +984,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
#ifndef NDEBUG
WORKER_PTR->originator = gettid();
#endif
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG,
"PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR
" kwp=%d pollable=%p",
@@ -1027,7 +1027,7 @@ static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked(
grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd* fd) {
static const char* err_desc = "pollset_transition_pollable_from_empty_to_fd";
grpc_error* error = GRPC_ERROR_NONE;
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p (%d); transition pollable from empty to fd",
pollset, fd, fd->fd);
@@ -1043,7 +1043,7 @@ static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked(
grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd* and_add_fd) {
static const char* err_desc = "pollset_transition_pollable_from_fd_to_multi";
grpc_error* error = GRPC_ERROR_NONE;
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"PS:%p add fd %p (%d); transition pollable from fd %p to multipoller",
@@ -1193,7 +1193,7 @@ static void pollset_set_unref(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss) {
static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss,
grpc_fd* fd) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
}
grpc_error* error = GRPC_ERROR_NONE;
@@ -1217,7 +1217,7 @@ static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss,
static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss,
grpc_fd* fd) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd);
}
pss = pss_lock_adam(pss);
@@ -1238,7 +1238,7 @@ static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss,
static void pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* pss, grpc_pollset* ps) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps);
}
pss = pss_lock_adam(pss);
@@ -1289,7 +1289,7 @@ static grpc_error* add_fds_to_pollsets(grpc_exec_ctx* exec_ctx, grpc_fd** fds,
static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* pss, grpc_pollset* ps) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps);
}
grpc_error* error = GRPC_ERROR_NONE;
@@ -1326,7 +1326,7 @@ static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx,
static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* a,
grpc_pollset_set* b) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b);
}
grpc_error* error = GRPC_ERROR_NONE;
@@ -1360,7 +1360,7 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx,
if (b_size > a_size) {
GPR_SWAP(grpc_pollset_set*, a, b);
}
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a);
}
gpr_ref(&a->refs);
@@ -1461,10 +1461,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
return nullptr;
}
-#ifndef NDEBUG
- grpc_register_tracer(&grpc_trace_pollable_refcount);
-#endif
-
fd_global_init();
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
diff --git a/src/core/lib/iomgr/ev_epollex_linux.h b/src/core/lib/iomgr/ev_epollex_linux.h
index 22b536c7d4..ffa7fc7f32 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.h
+++ b/src/core/lib/iomgr/ev_epollex_linux.h
@@ -22,15 +22,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/port.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLEX_LINUX_H */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 5b4a7ba19c..12c8483b8e 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -54,9 +54,9 @@
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
-#define GRPC_POLLING_TRACE(...) \
- if (GRPC_TRACER_ON(grpc_polling_trace)) { \
- gpr_log(GPR_INFO, __VA_ARGS__); \
+#define GRPC_POLLING_TRACE(...) \
+ if (grpc_polling_trace.enabled()) { \
+ gpr_log(GPR_INFO, __VA_ARGS__); \
}
static int grpc_wakeup_signal = -1;
@@ -289,7 +289,7 @@ static void pi_unref(grpc_exec_ctx* exec_ctx, polling_island* pi);
#ifndef NDEBUG
static void pi_add_ref_dbg(polling_island* pi, const char* reason,
const char* file, int line) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
gpr_log(GPR_DEBUG,
"Add ref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
@@ -301,7 +301,7 @@ static void pi_add_ref_dbg(polling_island* pi, const char* reason,
static void pi_unref_dbg(grpc_exec_ctx* exec_ctx, polling_island* pi,
const char* reason, const char* file, int line) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
gpr_log(GPR_DEBUG,
"Unref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
@@ -733,7 +733,7 @@ static gpr_mu fd_freelist_mu;
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
int line) {
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
+ if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG,
"FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
@@ -750,7 +750,7 @@ static void ref_by(grpc_fd* fd, int n) {
#ifndef NDEBUG
static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
int line) {
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
+ if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG,
"FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
@@ -767,8 +767,8 @@ static void unref_by(grpc_fd* fd, int n) {
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
- fd->read_closure.Destroy();
- fd->write_closure.Destroy();
+ fd->read_closure->DestroyEvent();
+ fd->write_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -819,6 +819,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
if (new_fd == nullptr) {
new_fd = (grpc_fd*)gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&new_fd->po.mu);
+ new_fd->read_closure.Init();
+ new_fd->write_closure.Init();
}
/* Note: It is not really needed to get the new_fd->po.mu lock here. If this
@@ -833,8 +835,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
new_fd->orphaned = false;
- new_fd->read_closure.Init();
- new_fd->write_closure.Init();
+ new_fd->read_closure->InitEvent();
+ new_fd->write_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.h b/src/core/lib/iomgr/ev_epollsig_linux.h
index ca68595734..5b8aba9d9f 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.h
+++ b/src/core/lib/iomgr/ev_epollsig_linux.h
@@ -22,10 +22,6 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/port.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
const grpc_event_engine_vtable* grpc_init_epollsig_linux(bool explicit_request);
#ifdef GRPC_LINUX_EPOLL
@@ -34,8 +30,4 @@ void* grpc_pollset_get_polling_island(grpc_pollset* ps);
bool grpc_are_polling_islands_equal(void* p, void* q);
#endif /* defined(GRPC_LINUX_EPOLL) */
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLSIG_LINUX_H */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index f8b9629462..e32e1ba42a 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -288,7 +288,7 @@ cv_fd_table g_cvfds;
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
int line) {
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
+ if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG,
"FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
@@ -305,7 +305,7 @@ static void ref_by(grpc_fd* fd, int n) {
#ifndef NDEBUG
static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
int line) {
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
+ if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG,
"FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
@@ -992,7 +992,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r);
}
@@ -1016,7 +1016,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset);
}
work_combine_error(
@@ -1026,7 +1026,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
if (watchers[i].fd == nullptr) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0, nullptr);
} else {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset,
pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
(pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
@@ -1382,7 +1382,7 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_ref(&g_cvfds.pollcount);
gpr_thd_options_set_detached(&opt);
- GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
+ GPR_ASSERT(gpr_thd_new(&t_id, "grpc_poller", &run_poll, pargs, &opt));
return pargs;
}
diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h
index 626e95bc8f..f6bc624d4f 100644
--- a/src/core/lib/iomgr/ev_poll_posix.h
+++ b/src/core/lib/iomgr/ev_poll_posix.h
@@ -21,15 +21,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request);
const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 076d2e6bab..031c97564a 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -36,13 +36,9 @@
#include "src/core/lib/iomgr/ev_poll_posix.h"
#include "src/core/lib/support/env.h"
-grpc_tracer_flag grpc_polling_trace =
- GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */
-
-#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount =
- GRPC_TRACER_INITIALIZER(false, "fd_refcount");
-#endif
+grpc_core::TraceFlag grpc_polling_trace(false,
+ "polling"); /* Disabled by default */
+grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
/** Default poll() function - a pointer so that it can be overridden by some
* tests */
@@ -63,8 +59,6 @@ typedef struct {
namespace {
-extern "C" {
-
grpc_poll_function_type real_poll_function;
int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
@@ -76,7 +70,6 @@ int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
return -1;
}
}
-} // extern "C"
const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
if (!explicit_request) {
@@ -153,8 +146,6 @@ const grpc_event_engine_vtable* grpc_get_event_engine_test_only() {
const char* grpc_get_poll_strategy_name() { return g_poll_strategy_name; }
void grpc_event_engine_init(void) {
- grpc_register_tracer(&grpc_polling_trace);
-
char* s = gpr_getenv("GRPC_POLL_STRATEGY");
if (s == nullptr) {
s = gpr_strdup("all");
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index d719b8f3c9..16fa10ca56 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -27,11 +27,7 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-extern grpc_tracer_flag grpc_polling_trace; /* Disabled by default */
+extern grpc_core::TraceFlag grpc_polling_trace; /* Disabled by default */
typedef struct grpc_fd grpc_fd;
@@ -162,8 +158,4 @@ extern grpc_poll_function_type grpc_poll_function;
void grpc_set_event_engine_test_only(const grpc_event_engine_vtable*);
const grpc_event_engine_vtable* grpc_get_event_engine_test_only();
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_windows.cc b/src/core/lib/iomgr/ev_windows.cc
index c24dfaeaf7..697697d0b0 100644
--- a/src/core/lib/iomgr/ev_windows.cc
+++ b/src/core/lib/iomgr/ev_windows.cc
@@ -22,7 +22,7 @@
#include "src/core/lib/debug/trace.h"
-grpc_tracer_flag grpc_polling_trace =
- GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */
+grpc_core::TraceFlag grpc_polling_trace(false,
+ "polling"); /* Disabled by default */
#endif // GRPC_WINSOCK_SOCKET
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 27c769a7ed..1777456342 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -25,9 +25,6 @@
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/profiling/timers.h"
-#define GRPC_START_TIME_UPDATE_INTERVAL 10000
-extern "C" grpc_tracer_flag grpc_timer_check_trace;
-
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx* exec_ctx) {
if ((exec_ctx->flags & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
if (exec_ctx->check_ready_to_finish(exec_ctx,
@@ -63,7 +60,7 @@ static void exec_ctx_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
grpc_error* error) {
#ifndef NDEBUG
closure->scheduled = false;
- if (GRPC_TRACER_ON(grpc_trace_closure)) {
+ if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]",
closure, closure->file_created, closure->line_created,
closure->run ? "run" : "scheduled", closure->file_initiated,
@@ -72,7 +69,7 @@ static void exec_ctx_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
#endif
closure->cb(exec_ctx, closure->cb_arg, error);
#ifndef NDEBUG
- if (GRPC_TRACER_ON(grpc_trace_closure)) {
+ if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "closure %p finished", closure);
}
#endif
@@ -107,49 +104,16 @@ static void exec_ctx_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
-/* This time pair is not entirely thread-safe as store/load of tv_sec and
- * tv_nsec are performed separately. However g_start_time do not need to have
- * sub-second precision, so it is ok if the value of tv_nsec is off in this
- * case. */
-typedef struct time_atm_pair {
- gpr_atm tv_sec;
- gpr_atm tv_nsec;
-} time_atm_pair;
-
-static time_atm_pair
- g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the
- // last enum value in
- // gpr_clock_type
-static grpc_millis g_last_start_time_update;
-
-static gpr_timespec timespec_from_time_atm_pair(const time_atm_pair* src,
- gpr_clock_type clock_type) {
- gpr_timespec time;
- time.tv_nsec = (int32_t)gpr_atm_no_barrier_load(&src->tv_nsec);
- time.tv_sec = (int64_t)gpr_atm_no_barrier_load(&src->tv_sec);
- time.clock_type = clock_type;
- return time;
-}
-
-static void time_atm_pair_store(time_atm_pair* dst, const gpr_timespec src) {
- gpr_atm_no_barrier_store(&dst->tv_sec, src.tv_sec);
- gpr_atm_no_barrier_store(&dst->tv_nsec, src.tv_nsec);
-}
+static gpr_timespec g_start_time;
void grpc_exec_ctx_global_init(void) {
- for (int i = 0; i < GPR_TIMESPAN; i++) {
- time_atm_pair_store(&g_start_time[i], gpr_now((gpr_clock_type)i));
- }
- // allows uniform treatment in conversion functions
- time_atm_pair_store(&g_start_time[GPR_TIMESPAN], gpr_time_0(GPR_TIMESPAN));
+ g_start_time = gpr_now(GPR_CLOCK_MONOTONIC);
}
void grpc_exec_ctx_global_shutdown(void) {}
static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
- gpr_timespec start_time =
- timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type);
- ts = gpr_time_sub(ts, start_time);
+ ts = gpr_time_sub(ts, g_start_time);
double x =
GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS;
if (x < 0) return 0;
@@ -158,9 +122,7 @@ static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
}
static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) {
- gpr_timespec start_time =
- timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type);
- ts = gpr_time_sub(ts, start_time);
+ ts = gpr_time_sub(ts, g_start_time);
double x = GPR_MS_PER_SEC * (double)ts.tv_sec +
(double)ts.tv_nsec / GPR_NS_PER_MS +
(double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC;
@@ -195,41 +157,18 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis,
if (clock_type == GPR_TIMESPAN) {
return gpr_time_from_millis(millis, GPR_TIMESPAN);
}
- gpr_timespec start_time =
- timespec_from_time_atm_pair(&g_start_time[clock_type], clock_type);
- return gpr_time_add(start_time, gpr_time_from_millis(millis, GPR_TIMESPAN));
+ return gpr_time_add(gpr_convert_clock_type(g_start_time, clock_type),
+ gpr_time_from_millis(millis, GPR_TIMESPAN));
}
grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) {
- return timespec_to_atm_round_down(ts);
+ return timespec_to_atm_round_down(
+ gpr_convert_clock_type(ts, g_start_time.clock_type));
}
grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) {
- return timespec_to_atm_round_up(ts);
-}
-
-void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx) {
- grpc_millis now = grpc_exec_ctx_now(exec_ctx);
- grpc_millis last_start_time_update =
- gpr_atm_no_barrier_load(&g_last_start_time_update);
-
- if (now > last_start_time_update &&
- now - last_start_time_update > GRPC_START_TIME_UPDATE_INTERVAL) {
- /* Get the current system time and subtract \a now from it, where \a now is
- * the relative time from grpc_init() from monotonic clock. This calibrates
- * the time when grpc_exec_ctx_global_init was called based on current
- * system clock. */
- gpr_atm_no_barrier_store(&g_last_start_time_update, now);
- gpr_timespec real_now = gpr_now(GPR_CLOCK_REALTIME);
- gpr_timespec real_start_time =
- gpr_time_sub(real_now, gpr_time_from_millis(now, GPR_TIMESPAN));
- time_atm_pair_store(&g_start_time[GPR_CLOCK_REALTIME], real_start_time);
-
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "Update realtime clock start time: %" PRId64 "s %dns",
- real_start_time.tv_sec, real_start_time.tv_nsec);
- }
- }
+ return timespec_to_atm_round_up(
+ gpr_convert_clock_type(ts, g_start_time.clock_type));
}
static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = {
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 6035e08361..b415d2c255 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -24,10 +24,6 @@
#include "src/core/lib/iomgr/closure.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef gpr_atm grpc_millis;
#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX
@@ -124,10 +120,4 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock);
grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec);
grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec);
-void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx);
-
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index 6097d66024..fabdbdf934 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -51,8 +51,7 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
GPR_TLS_DECL(g_this_thread_state);
-static grpc_tracer_flag executor_trace =
- GRPC_TRACER_INITIALIZER(false, "executor");
+grpc_core::TraceFlag executor_trace(false, "executor");
static void executor_thread(void* arg);
@@ -63,7 +62,7 @@ static size_t run_closures(grpc_exec_ctx* exec_ctx, grpc_closure_list list) {
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
grpc_error* error = c->error_data.error;
- if (GRPC_TRACER_ON(executor_trace)) {
+ if (executor_trace.enabled()) {
#ifndef NDEBUG
gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
c->file_created, c->line_created);
@@ -105,8 +104,8 @@ void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool threading) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
- &opt);
+ gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread,
+ &g_thread_state[0], &opt);
} else {
if (cur_threads == 0) return;
for (size_t i = 0; i < g_max_threads; i++) {
@@ -134,7 +133,6 @@ void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool threading) {
}
void grpc_executor_init(grpc_exec_ctx* exec_ctx) {
- grpc_register_tracer(&executor_trace);
gpr_atm_no_barrier_store(&g_cur_threads, 0);
grpc_executor_set_threading(exec_ctx, true);
}
@@ -152,7 +150,7 @@ static void executor_thread(void* arg) {
size_t subtract_depth = 0;
for (;;) {
- if (GRPC_TRACER_ON(executor_trace)) {
+ if (executor_trace.enabled()) {
gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
(int)(ts - g_thread_state), subtract_depth);
}
@@ -163,7 +161,7 @@ static void executor_thread(void* arg) {
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
if (ts->shutdown) {
- if (GRPC_TRACER_ON(executor_trace)) {
+ if (executor_trace.enabled()) {
gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown",
(int)(ts - g_thread_state));
}
@@ -174,7 +172,7 @@ static void executor_thread(void* arg) {
grpc_closure_list exec = ts->elems;
ts->elems = GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
- if (GRPC_TRACER_ON(executor_trace)) {
+ if (executor_trace.enabled()) {
gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
}
@@ -196,7 +194,7 @@ static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
retry_push = false;
size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
if (cur_thread_count == 0) {
- if (GRPC_TRACER_ON(executor_trace)) {
+ if (executor_trace.enabled()) {
#ifndef NDEBUG
gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline",
closure, closure->file_created, closure->line_created);
@@ -217,7 +215,7 @@ static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
bool try_new_thread;
for (;;) {
- if (GRPC_TRACER_ON(executor_trace)) {
+ if (executor_trace.enabled()) {
#ifndef NDEBUG
gpr_log(
GPR_DEBUG,
@@ -265,8 +263,8 @@ static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
- &g_thread_state[cur_thread_count], &opt);
+ gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor",
+ executor_thread, &g_thread_state[cur_thread_count], &opt);
}
gpr_spinlock_unlock(&g_adding_thread_lock);
}
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index 8418ace06e..d349083eeb 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -21,10 +21,6 @@
#include "src/core/lib/iomgr/closure.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef enum {
GRPC_EXECUTOR_SHORT,
GRPC_EXECUTOR_LONG
@@ -49,8 +45,4 @@ bool grpc_executor_is_threaded();
grpc_executor_shutdown */
void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool enable);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
new file mode 100644
index 0000000000..f3cfd141b6
--- /dev/null
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_POSIX_FORK
+
+#include <string.h>
+
+#include <grpc/fork.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/timer_manager.h"
+#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/fork.h"
+#include "src/core/lib/support/thd_internal.h"
+#include "src/core/lib/surface/init.h"
+
+/*
+ * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
+ * AROUND VERY SPECIFIC USE CASES.
+ */
+
+void grpc_prefork() {
+ if (!grpc_fork_support_enabled()) {
+ gpr_log(GPR_ERROR,
+ "Fork support not enabled; try running with the "
+ "environment variable GRPC_ENABLE_FORK_SUPPORT=1");
+ return;
+ }
+ if (grpc_is_initialized()) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_timer_manager_set_threading(false);
+ grpc_executor_set_threading(&exec_ctx, false);
+ grpc_exec_ctx_finish(&exec_ctx);
+ if (!gpr_await_threads(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(3, GPR_TIMESPAN)))) {
+ gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!");
+ }
+ }
+}
+
+void grpc_postfork_parent() {
+ if (grpc_is_initialized()) {
+ grpc_timer_manager_set_threading(true);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_executor_set_threading(&exec_ctx, true);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+}
+
+void grpc_postfork_child() {
+ if (grpc_is_initialized()) {
+ grpc_timer_manager_set_threading(true);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_executor_set_threading(&exec_ctx, true);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+}
+
+void grpc_fork_handlers_auto_register() {
+ if (grpc_fork_support_enabled()) {
+#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
+ pthread_atfork(grpc_prefork, grpc_postfork_parent, grpc_postfork_child);
+#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
+ }
+}
+
+#endif // GRPC_POSIX_FORK
diff --git a/src/core/lib/iomgr/fork_windows.cc b/src/core/lib/iomgr/fork_windows.cc
new file mode 100644
index 0000000000..f9986f33c7
--- /dev/null
+++ b/src/core/lib/iomgr/fork_windows.cc
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifndef GRPC_POSIX_FORK
+
+#include <grpc/fork.h>
+#include <grpc/support/log.h>
+
+/*
+ * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
+ * AROUND VERY SPECIFIC USE CASES.
+ */
+
+void grpc_prefork() { gpr_log(GPR_ERROR, "Forking not supported on Windows"); }
+
+void grpc_postfork_parent() {}
+
+void grpc_postfork_child() {}
+
+void grpc_fork_handlers_auto_register() {}
+
+#endif // GRPC_POSIX_FORK
diff --git a/src/core/lib/iomgr/gethostname.h b/src/core/lib/iomgr/gethostname.h
index 2e65b5ffbf..9f10b4afa7 100644
--- a/src/core/lib/iomgr/gethostname.h
+++ b/src/core/lib/iomgr/gethostname.h
@@ -19,16 +19,8 @@
#ifndef GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H
#define GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H
-#ifdef __cplusplus
-extern "C" {
-#endif
-
// Returns the hostname of the local machine.
// Caller takes ownership of result.
char* grpc_gethostname();
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H */
diff --git a/src/core/lib/iomgr/iocp_windows.h b/src/core/lib/iomgr/iocp_windows.h
index d112c50538..0e9c3481f7 100644
--- a/src/core/lib/iomgr/iocp_windows.h
+++ b/src/core/lib/iomgr/iocp_windows.h
@@ -27,10 +27,6 @@
#include "src/core/lib/iomgr/socket_windows.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef enum {
GRPC_IOCP_WORK_WORK,
GRPC_IOCP_WORK_TIMEOUT,
@@ -45,10 +41,6 @@ void grpc_iocp_flush(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket*);
-#ifdef __cplusplus
-}
-#endif
-
#endif
#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index d1549c8c63..2f00c0343d 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -22,10 +22,6 @@
#include <grpc/impl/codegen/exec_ctx_fwd.h>
#include "src/core/lib/iomgr/port.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/** Initializes the iomgr. */
void grpc_iomgr_init(grpc_exec_ctx* exec_ctx);
@@ -36,8 +32,4 @@ void grpc_iomgr_start(grpc_exec_ctx* exec_ctx);
* exec_ctx. */
void grpc_iomgr_shutdown(grpc_exec_ctx* exec_ctx);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */
diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h
index b818c68da0..20b3cb70d0 100644
--- a/src/core/lib/iomgr/iomgr_internal.h
+++ b/src/core/lib/iomgr/iomgr_internal.h
@@ -23,10 +23,6 @@
#include "src/core/lib/iomgr/iomgr.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef struct grpc_iomgr_object {
char* name;
struct grpc_iomgr_object* next;
@@ -44,8 +40,4 @@ void grpc_iomgr_platform_shutdown(void);
bool grpc_iomgr_abort_on_leaks(void);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */
diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc
index f5875a247e..f8f6fe2353 100644
--- a/src/core/lib/iomgr/iomgr_posix.cc
+++ b/src/core/lib/iomgr/iomgr_posix.cc
@@ -28,7 +28,6 @@
void grpc_iomgr_platform_init(void) {
grpc_wakeup_fd_global_init();
grpc_event_engine_init();
- grpc_register_tracer(&grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
diff --git a/src/core/lib/iomgr/iomgr_uv.cc b/src/core/lib/iomgr/iomgr_uv.cc
index df5d23af3b..b8a10f2ae8 100644
--- a/src/core/lib/iomgr/iomgr_uv.cc
+++ b/src/core/lib/iomgr/iomgr_uv.cc
@@ -31,7 +31,7 @@ gpr_thd_id g_init_thread;
void grpc_iomgr_platform_init(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_global_init();
- grpc_register_tracer(&grpc_tcp_trace);
+
grpc_executor_set_threading(&exec_ctx, false);
g_init_thread = gpr_thd_currentid();
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h
index bc42ca8c1c..3b4daaa73b 100644
--- a/src/core/lib/iomgr/iomgr_uv.h
+++ b/src/core/lib/iomgr/iomgr_uv.h
@@ -23,18 +23,10 @@
#include <grpc/support/thd.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* The thread ID of the thread on which grpc was initialized. Used to verify
* that all calls into libuv are made on that same thread */
extern gpr_thd_id g_init_thread;
-#ifdef __cplusplus
-}
-#endif
-
#ifdef GRPC_UV_THREAD_CHECK
#define GRPC_UV_ASSERT_SAME_THREAD() \
GPR_ASSERT(gpr_thd_currentid() == g_init_thread)
diff --git a/src/core/lib/iomgr/load_file.h b/src/core/lib/iomgr/load_file.h
index 5b367c189d..a7336527ce 100644
--- a/src/core/lib/iomgr/load_file.h
+++ b/src/core/lib/iomgr/load_file.h
@@ -25,17 +25,9 @@
#include "src/core/lib/iomgr/error.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* Loads the content of a file into a slice. add_null_terminator will add
a NULL terminator if non-zero. */
grpc_error* grpc_load_file(const char* filename, int add_null_terminator,
grpc_slice* slice);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_LOAD_FILE_H */
diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc
index 40e2ed6219..f0e798e8d8 100644
--- a/src/core/lib/iomgr/lockfree_event.cc
+++ b/src/core/lib/iomgr/lockfree_event.cc
@@ -22,7 +22,7 @@
#include "src/core/lib/debug/trace.h"
-extern grpc_tracer_flag grpc_polling_trace;
+extern grpc_core::TraceFlag grpc_polling_trace;
/* 'state' holds the to call when the fd is readable or writable respectively.
It can contain one of the following values:
@@ -57,7 +57,9 @@ extern grpc_tracer_flag grpc_polling_trace;
namespace grpc_core {
-LockfreeEvent::LockfreeEvent() {
+LockfreeEvent::LockfreeEvent() { InitEvent(); }
+
+void LockfreeEvent::InitEvent() {
/* Perform an atomic store to start the state machine.
Note carefully that LockfreeEvent *MAY* be used whilst in a destroyed
@@ -67,7 +69,7 @@ LockfreeEvent::LockfreeEvent() {
gpr_atm_no_barrier_store(&state_, kClosureNotReady);
}
-LockfreeEvent::~LockfreeEvent() {
+void LockfreeEvent::DestroyEvent() {
gpr_atm curr;
do {
curr = gpr_atm_no_barrier_load(&state_);
@@ -86,7 +88,7 @@ LockfreeEvent::~LockfreeEvent() {
void LockfreeEvent::NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
(void*)curr, closure);
}
@@ -153,7 +155,7 @@ bool LockfreeEvent::SetShutdown(grpc_exec_ctx* exec_ctx,
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "LockfreeEvent::SetShutdown: %p curr=%p err=%s",
&state_, (void*)curr, grpc_error_string(shutdown_err));
}
@@ -202,7 +204,7 @@ void LockfreeEvent::SetReady(grpc_exec_ctx* exec_ctx) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "LockfreeEvent::SetReady: %p curr=%p", &state_,
(void*)curr);
}
diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h
index c667dcd3bc..aec67a3399 100644
--- a/src/core/lib/iomgr/lockfree_event.h
+++ b/src/core/lib/iomgr/lockfree_event.h
@@ -30,11 +30,16 @@ namespace grpc_core {
class LockfreeEvent {
public:
LockfreeEvent();
- ~LockfreeEvent();
LockfreeEvent(const LockfreeEvent&) = delete;
LockfreeEvent& operator=(const LockfreeEvent&) = delete;
+ // These methods are used to initialize and destroy the internal state. These
+ // cannot be done in constructor and destructor because SetReady may be called
+ // when the event is destroyed and put in a freelist.
+ void InitEvent();
+ void DestroyEvent();
+
bool IsShutdown() const {
return (gpr_atm_no_barrier_load(&state_) & kShutdownBit) != 0;
}
diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h
index 867e085153..dbe579e60d 100644
--- a/src/core/lib/iomgr/polling_entity.h
+++ b/src/core/lib/iomgr/polling_entity.h
@@ -22,10 +22,6 @@
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef enum grpc_pollset_tag {
GRPC_POLLS_NONE,
GRPC_POLLS_POLLSET,
@@ -68,8 +64,5 @@ void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx* exec_ctx,
void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx* exec_ctx,
grpc_polling_entity* pollent,
grpc_pollset_set* pss_dst);
-#ifdef __cplusplus
-}
-#endif
#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */
diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h
index c99b930e8e..d5d78f3101 100644
--- a/src/core/lib/iomgr/pollset.h
+++ b/src/core/lib/iomgr/pollset.h
@@ -25,13 +25,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#ifndef NDEBUG
-extern grpc_tracer_flag grpc_trace_fd_refcount;
-#endif
+extern grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount;
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
@@ -84,8 +78,4 @@ grpc_error* grpc_pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
grpc_pollset_worker* specific_worker)
GRPC_MUST_USE_RESULT;
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */
diff --git a/src/core/lib/iomgr/pollset_set.h b/src/core/lib/iomgr/pollset_set.h
index 0167a50a56..089c15cc94 100644
--- a/src/core/lib/iomgr/pollset_set.h
+++ b/src/core/lib/iomgr/pollset_set.h
@@ -21,10 +21,6 @@
#include "src/core/lib/iomgr/pollset.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* A grpc_pollset_set is a set of pollsets that are interested in an
action. Adding a pollset to a pollset_set automatically adds any
fd's (etc) that have been registered with the set_set to that pollset.
@@ -48,8 +44,4 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* bag,
grpc_pollset_set* item);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_H */
diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc
index 1d54942c1d..16132f3a80 100644
--- a/src/core/lib/iomgr/pollset_uv.cc
+++ b/src/core/lib/iomgr/pollset_uv.cc
@@ -34,10 +34,7 @@
#include "src/core/lib/debug/trace.h"
-#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount =
- GRPC_TRACER_INITIALIZER(false, "fd_refcount");
-#endif
+grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
struct grpc_pollset {
uv_timer_t* timer;
diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h
index 5cc9faf4ff..566c110ca6 100644
--- a/src/core/lib/iomgr/pollset_uv.h
+++ b/src/core/lib/iomgr/pollset_uv.h
@@ -19,17 +19,9 @@
#ifndef GRPC_CORE_LIB_IOMGR_POLLSET_UV_H
#define GRPC_CORE_LIB_IOMGR_POLLSET_UV_H
-#ifdef __cplusplus
-extern "C" {
-#endif
-
extern int grpc_pollset_work_run_loop;
void grpc_pollset_global_init(void);
void grpc_pollset_global_shutdown(void);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */
diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc
index 5998b3f5bc..95dd7d7ddd 100644
--- a/src/core/lib/iomgr/pollset_windows.cc
+++ b/src/core/lib/iomgr/pollset_windows.cc
@@ -30,10 +30,7 @@
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
-#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount =
- GRPC_TRACER_INITIALIZER(false, "fd_refcount");
-#endif
+grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
gpr_mu grpc_polling_mu;
static grpc_pollset_worker* g_active_poller;
diff --git a/src/core/lib/iomgr/pollset_windows.h b/src/core/lib/iomgr/pollset_windows.h
index f6da9da601..93fe7d669b 100644
--- a/src/core/lib/iomgr/pollset_windows.h
+++ b/src/core/lib/iomgr/pollset_windows.h
@@ -26,10 +26,6 @@
#ifdef GRPC_WINSOCK_SOCKET
#include "src/core/lib/iomgr/socket_windows.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. A Windows "pollset" is merely a mutex
used to synchronize with the IOCP, and workers are condition variables
@@ -67,10 +63,6 @@ struct grpc_pollset {
void grpc_pollset_global_init(void);
void grpc_pollset_global_shutdown(void);
-#ifdef __cplusplus
-}
-#endif
-
#endif
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 1cc6d98491..9fae8c0052 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -30,6 +30,7 @@
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
@@ -59,6 +60,7 @@
#define GRPC_HAVE_MSG_NOSIGNAL 1
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1
+#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_HOST_NAME_MAX 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
@@ -90,6 +92,7 @@
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_MSG_IOVLEN_TYPE int
+#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
@@ -103,6 +106,7 @@
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index 847e10f177..5105020404 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -25,10 +25,6 @@
#define GRPC_MAX_SOCKADDR_SIZE 128
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef struct {
char addr[GRPC_MAX_SOCKADDR_SIZE];
size_t len;
@@ -56,8 +52,4 @@ extern grpc_error* (*grpc_blocking_resolve_address)(
const char* name, const char* default_port,
grpc_resolved_addresses** addresses);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc
index 9a44fa203c..ccd8d9f379 100644
--- a/src/core/lib/iomgr/resource_quota.cc
+++ b/src/core/lib/iomgr/resource_quota.cc
@@ -31,8 +31,7 @@
#include "src/core/lib/iomgr/combiner.h"
-grpc_tracer_flag grpc_resource_quota_trace =
- GRPC_TRACER_INITIALIZER(false, "resource_quota");
+grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
#define MEMORY_USAGE_ESTIMATION_MAX 65536
@@ -293,7 +292,7 @@ static bool rq_alloc(grpc_exec_ctx* exec_ctx,
while ((resource_user = rulist_pop_head(resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION))) {
gpr_mu_lock(&resource_user->mu);
- if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_DEBUG,
"RQ: check allocation for user %p shutdown=%" PRIdPTR
" free_pool=%" PRId64,
@@ -319,14 +318,14 @@ static bool rq_alloc(grpc_exec_ctx* exec_ctx,
resource_user->free_pool = 0;
resource_quota->free_pool -= amt;
rq_update_estimate(resource_quota);
- if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_DEBUG,
"RQ %s %s: grant alloc %" PRId64
" bytes; rq_free_pool -> %" PRId64,
resource_quota->name, resource_user->name, amt,
resource_quota->free_pool);
}
- } else if (GRPC_TRACER_ON(grpc_resource_quota_trace) &&
+ } else if (grpc_resource_quota_trace.enabled() &&
resource_user->free_pool >= 0) {
gpr_log(GPR_DEBUG, "RQ %s %s: discard already satisfied alloc request",
resource_quota->name, resource_user->name);
@@ -357,7 +356,7 @@ static bool rq_reclaim_from_per_user_free_pool(
resource_user->free_pool = 0;
resource_quota->free_pool += amt;
rq_update_estimate(resource_quota);
- if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_DEBUG,
"RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
" bytes; rq_free_pool -> %" PRId64,
@@ -381,7 +380,7 @@ static bool rq_reclaim(grpc_exec_ctx* exec_ctx,
: GRPC_RULIST_RECLAIMER_BENIGN;
grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
if (resource_user == nullptr) return false;
- if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_DEBUG, "RQ %s %s: initiate %s reclamation",
resource_quota->name, resource_user->name,
destructive ? "destructive" : "benign");
@@ -515,7 +514,7 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx* exec_ctx, void* ru,
}
static void ru_shutdown(grpc_exec_ctx* exec_ctx, void* ru, grpc_error* error) {
- if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_DEBUG, "RU shutdown %p", ru);
}
grpc_resource_user* resource_user = (grpc_resource_user*)ru;
@@ -813,7 +812,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx* exec_ctx,
ru_ref_by(resource_user, (gpr_atm)size);
resource_user->free_pool -= (int64_t)size;
resource_user->outstanding_allocations += (int64_t)size;
- if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->free_pool);
@@ -838,7 +837,7 @@ void grpc_resource_user_free(grpc_exec_ctx* exec_ctx,
gpr_mu_lock(&resource_user->mu);
bool was_zero_or_negative = resource_user->free_pool <= 0;
resource_user->free_pool += (int64_t)size;
- if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->free_pool);
@@ -867,7 +866,7 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx* exec_ctx,
void grpc_resource_user_finish_reclamation(grpc_exec_ctx* exec_ctx,
grpc_resource_user* resource_user) {
- if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
@@ -896,10 +895,3 @@ void grpc_resource_user_alloc_slices(
grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user,
count * length, &slice_allocator->on_allocated);
}
-
-grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx* exec_ctx,
- grpc_resource_user* resource_user,
- size_t size) {
- grpc_resource_user_alloc(exec_ctx, resource_user, size, nullptr);
- return ru_slice_create(resource_user, size);
-}
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index fcdf9c2de5..787370307a 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -24,10 +24,6 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/exec_ctx.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/** \file Tracks resource usage against a pool.
The current implementation tracks only memory usage, but in the future
@@ -65,7 +61,7 @@ extern "C" {
maintain lists of users (which users arrange to leave before they are
destroyed) */
-extern grpc_tracer_flag grpc_resource_quota_trace;
+extern grpc_core::TraceFlag grpc_resource_quota_trace;
grpc_resource_quota* grpc_resource_quota_ref_internal(
grpc_resource_quota* resource_quota);
@@ -154,8 +150,4 @@ grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx* exec_ctx,
grpc_resource_user* resource_user,
size_t size);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */
diff --git a/src/core/lib/iomgr/sockaddr_utils.cc b/src/core/lib/iomgr/sockaddr_utils.cc
index 3477fb52cd..0c0a2fe5b2 100644
--- a/src/core/lib/iomgr/sockaddr_utils.cc
+++ b/src/core/lib/iomgr/sockaddr_utils.cc
@@ -148,7 +148,7 @@ int grpc_sockaddr_to_string(char** out,
grpc_resolved_address addr_normalized;
char ntop_buf[INET6_ADDRSTRLEN];
const void* ip = nullptr;
- int port;
+ int port = 0;
uint32_t sin6_scope_id = 0;
int ret;
diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h
index 090470d49e..e3bd51a4ad 100644
--- a/src/core/lib/iomgr/sockaddr_utils.h
+++ b/src/core/lib/iomgr/sockaddr_utils.h
@@ -21,10 +21,6 @@
#include "src/core/lib/iomgr/resolve_address.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* Returns true if addr is an IPv4-mapped IPv6 address within the
::ffff:0.0.0.0/96 range, or false otherwise.
@@ -81,8 +77,4 @@ const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* addr);
int grpc_sockaddr_get_family(const grpc_resolved_address* resolved_addr);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */
diff --git a/src/core/lib/iomgr/socket_factory_posix.h b/src/core/lib/iomgr/socket_factory_posix.h
index e8257b07c4..af57cc5b60 100644
--- a/src/core/lib/iomgr/socket_factory_posix.h
+++ b/src/core/lib/iomgr/socket_factory_posix.h
@@ -23,10 +23,6 @@
#include <grpc/support/sync.h>
#include "src/core/lib/iomgr/resolve_address.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/** The virtual table of grpc_socket_factory */
typedef struct {
/** Replacement for socket(2) */
@@ -68,8 +64,4 @@ int grpc_socket_factory_compare(grpc_socket_factory* a, grpc_socket_factory* b);
grpc_socket_factory* grpc_socket_factory_ref(grpc_socket_factory* factory);
void grpc_socket_factory_unref(grpc_socket_factory* factory);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H */
diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h
index b4103f7e93..0a97cf657f 100644
--- a/src/core/lib/iomgr/socket_mutator.h
+++ b/src/core/lib/iomgr/socket_mutator.h
@@ -24,10 +24,6 @@
#include <stdbool.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/** The virtual table of grpc_socket_mutator */
typedef struct {
/** Mutates the socket opitons of \a fd */
@@ -60,8 +56,4 @@ int grpc_socket_mutator_compare(grpc_socket_mutator* a, grpc_socket_mutator* b);
grpc_socket_mutator* grpc_socket_mutator_ref(grpc_socket_mutator* mutator);
void grpc_socket_mutator_unref(grpc_socket_mutator* mutator);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H */
diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h
index 4816ab6be7..9fd141b6de 100644
--- a/src/core/lib/iomgr/socket_utils.h
+++ b/src/core/lib/iomgr/socket_utils.h
@@ -21,15 +21,7 @@
#include <stddef.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* A wrapper for inet_ntop on POSIX systems and InetNtop on Windows systems */
const char* grpc_inet_ntop(int af, const void* src, char* dst, size_t size);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index 7a9c8139e7..77df4205ff 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -29,10 +29,6 @@
#include "src/core/lib/iomgr/socket_factory_posix.h"
#include "src/core/lib/iomgr/socket_mutator.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* a wrapper for accept or accept4 */
int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock,
int cloexec);
@@ -133,8 +129,4 @@ grpc_error* grpc_create_dualstack_socket_using_factory(
grpc_socket_factory* factory, const grpc_resolved_address* addr, int type,
int protocol, grpc_dualstack_mode* dsmode, int* newfd);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index c3ad99d82f..04e0a89d70 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -31,10 +31,6 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* This holds the data for an outstanding read or write on a socket.
The mutex to protect the concurrent access to that data is the one
inside the winsocket wrapper. */
@@ -114,10 +110,6 @@ void grpc_socket_become_ready(grpc_exec_ctx* exec_ctx,
grpc_winsocket* winsocket,
grpc_winsocket_callback_info* ci);
-#ifdef __cplusplus
-}
-#endif
-
#endif
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index c18d8a9316..75e2fe0f36 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -25,10 +25,6 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure).
@@ -41,8 +37,4 @@ void grpc_tcp_client_connect(grpc_exec_ctx* exec_ctx, grpc_closure* on_connect,
const grpc_resolved_address* addr,
grpc_millis deadline);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index cb0f627c94..4cb2ac49d5 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -43,7 +43,7 @@
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
-extern grpc_tracer_flag grpc_tcp_trace;
+extern grpc_core::TraceFlag grpc_tcp_trace;
typedef struct {
gpr_mu mu;
@@ -99,7 +99,7 @@ done:
static void tc_on_alarm(grpc_exec_ctx* exec_ctx, void* acp, grpc_error* error) {
int done;
async_connect* ac = (async_connect*)acp;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str,
str);
@@ -138,7 +138,7 @@ static void on_writable(grpc_exec_ctx* exec_ctx, void* acp, grpc_error* error) {
GRPC_ERROR_REF(error);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: error=%s",
ac->addr_str, str);
@@ -317,7 +317,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx* exec_ctx,
grpc_schedule_on_exec_ctx);
ac->channel_args = grpc_channel_args_copy(channel_args);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
ac->addr_str, fdobj);
}
@@ -334,13 +334,11 @@ done:
}
// overridden by api_fuzzer.c
-extern "C" {
void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_millis deadline) = tcp_client_connect_impl;
-}
void grpc_tcp_client_connect(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
grpc_endpoint** ep,
diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h
index 13d917891e..2b1fe79e90 100644
--- a/src/core/lib/iomgr/tcp_client_posix.h
+++ b/src/core/lib/iomgr/tcp_client_posix.h
@@ -23,16 +23,8 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/tcp_client.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
grpc_endpoint* grpc_tcp_client_create_from_fd(
grpc_exec_ctx* exec_ctx, grpc_fd* fd, const grpc_channel_args* channel_args,
const char* addr_str);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */
diff --git a/src/core/lib/iomgr/tcp_client_uv.cc b/src/core/lib/iomgr/tcp_client_uv.cc
index 15345c8091..5cca0c9936 100644
--- a/src/core/lib/iomgr/tcp_client_uv.cc
+++ b/src/core/lib/iomgr/tcp_client_uv.cc
@@ -32,7 +32,7 @@
#include "src/core/lib/iomgr/tcp_uv.h"
#include "src/core/lib/iomgr/timer.h"
-extern grpc_tracer_flag grpc_tcp_trace;
+extern grpc_core::TraceFlag grpc_tcp_trace;
typedef struct grpc_uv_tcp_connect {
uv_connect_t connect_req;
@@ -59,7 +59,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx* exec_ctx, void* acp,
grpc_error* error) {
int done;
grpc_uv_tcp_connect* connect = (grpc_uv_tcp_connect*)acp;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
connect->addr_name, str);
@@ -147,7 +147,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx* exec_ctx,
connect->connect_req.data = connect;
connect->refs = 2; // One for the connect operation, one for the timer.
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
connect->addr_name);
}
@@ -161,13 +161,11 @@ static void tcp_client_connect_impl(grpc_exec_ctx* exec_ctx,
}
// overridden by api_fuzzer.c
-extern "C" {
void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_millis deadline) = tcp_client_connect_impl;
-}
void grpc_tcp_client_connect(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
grpc_endpoint** ep,
diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc
index 103e6b78de..5e30725e90 100644
--- a/src/core/lib/iomgr/tcp_client_windows.cc
+++ b/src/core/lib/iomgr/tcp_client_windows.cc
@@ -226,13 +226,11 @@ failure:
}
// overridden by api_fuzzer.c
-extern "C" {
void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_millis deadline) = tcp_client_connect_impl;
-}
void grpc_tcp_client_connect(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
grpc_endpoint** ep,
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index cb90933e31..d09cfca9af 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -61,7 +61,7 @@ typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
typedef size_t msg_iovlen_type;
#endif
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
+grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
typedef struct {
grpc_endpoint base;
@@ -81,9 +81,7 @@ typedef struct {
grpc_slice_buffer* incoming_buffer;
grpc_slice_buffer* outgoing_buffer;
- /** slice within outgoing_buffer to write next */
- size_t outgoing_slice_idx;
- /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
+ /** byte within outgoing_buffer->slices[0] to write next */
size_t outgoing_byte_idx;
grpc_closure* read_cb;
@@ -121,7 +119,7 @@ static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx,
static void done_poller(grpc_exec_ctx* exec_ctx, void* bp,
grpc_error* error_ignored) {
backup_poller* p = (backup_poller*)bp;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p);
}
grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p));
@@ -131,7 +129,7 @@ static void done_poller(grpc_exec_ctx* exec_ctx, void* bp,
static void run_poller(grpc_exec_ctx* exec_ctx, void* bp,
grpc_error* error_ignored) {
backup_poller* p = (backup_poller*)bp;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p);
}
gpr_mu_lock(p->pollset_mu);
@@ -147,18 +145,18 @@ static void run_poller(grpc_exec_ctx* exec_ctx, void* bp,
gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) {
gpr_mu_lock(p->pollset_mu);
bool cas_ok = gpr_atm_full_cas(&g_backup_poller, (gpr_atm)p, 0);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok);
}
gpr_mu_unlock(p->pollset_mu);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p);
}
grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p),
GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p,
grpc_schedule_on_exec_ctx));
} else {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p);
}
GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE);
@@ -169,7 +167,7 @@ static void drop_uncovered(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
backup_poller* p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller);
gpr_atm old_count =
gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count,
(int)old_count - 1);
}
@@ -180,14 +178,14 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
backup_poller* p;
gpr_atm old_count =
gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count,
2 + (int)old_count);
}
if (old_count == 0) {
GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx);
p = (backup_poller*)gpr_zalloc(sizeof(*p) + grpc_pollset_size());
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p);
}
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
@@ -203,7 +201,7 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
// spin waiting for backup poller
}
}
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp);
}
grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd);
@@ -213,7 +211,7 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
}
static void notify_on_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp);
}
GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp,
@@ -222,7 +220,7 @@ static void notify_on_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
}
static void notify_on_write(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp);
}
cover_self(exec_ctx, tcp);
@@ -234,7 +232,7 @@ static void notify_on_write(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx,
void* arg, grpc_error* error) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error));
}
drop_uncovered(exec_ctx, (grpc_tcp*)arg);
@@ -311,7 +309,7 @@ static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
const char* reason, const char* file, int line) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
@@ -324,7 +322,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
int line) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
@@ -355,7 +353,7 @@ static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
grpc_error* error) {
grpc_closure* cb = tcp->read_cb;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
size_t i;
const char* str = grpc_error_string(error);
@@ -451,7 +449,7 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp,
grpc_error* error) {
grpc_tcp* tcp = (grpc_tcp*)tcpp;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp,
grpc_error_string(error));
}
@@ -470,13 +468,13 @@ static void tcp_continue_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
size_t target_read_size = get_target_read_size(tcp);
if (tcp->incoming_buffer->length < target_read_size &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp);
}
grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator,
target_read_size, 1, tcp->incoming_buffer);
} else {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp);
}
tcp_do_read(exec_ctx, tcp);
@@ -487,7 +485,7 @@ static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
grpc_error* error) {
grpc_tcp* tcp = (grpc_tcp*)arg;
GPR_ASSERT(!tcp->finished_edge);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error));
}
@@ -532,23 +530,26 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
size_t unwind_slice_idx;
size_t unwind_byte_idx;
+ // We always start at zero, because we eagerly unref and trim the slice
+ // buffer as we write
+ size_t outgoing_slice_idx = 0;
+
for (;;) {
sending_length = 0;
- unwind_slice_idx = tcp->outgoing_slice_idx;
+ unwind_slice_idx = outgoing_slice_idx;
unwind_byte_idx = tcp->outgoing_byte_idx;
- for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
+ for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count &&
iov_size != MAX_WRITE_IOVEC;
iov_size++) {
iov[iov_size].iov_base =
GRPC_SLICE_START_PTR(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
+ tcp->outgoing_buffer->slices[outgoing_slice_idx]) +
tcp->outgoing_byte_idx;
iov[iov_size].iov_len =
- GRPC_SLICE_LENGTH(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
+ GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) -
tcp->outgoing_byte_idx;
sending_length += iov[iov_size].iov_len;
- tcp->outgoing_slice_idx++;
+ outgoing_slice_idx++;
tcp->outgoing_byte_idx = 0;
}
GPR_ASSERT(iov_size > 0);
@@ -574,16 +575,25 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
if (sent_length < 0) {
if (errno == EAGAIN) {
- tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
+ // unref all and forget about all slices that have been written to this
+ // point
+ for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
+ grpc_slice_unref_internal(
+ exec_ctx, grpc_slice_buffer_take_first(tcp->outgoing_buffer));
+ }
return false;
} else if (errno == EPIPE) {
*error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ tcp->outgoing_buffer);
return true;
} else {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ tcp->outgoing_buffer);
return true;
}
}
@@ -593,9 +603,9 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
while (trailing > 0) {
size_t slice_length;
- tcp->outgoing_slice_idx--;
- slice_length = GRPC_SLICE_LENGTH(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
+ outgoing_slice_idx--;
+ slice_length =
+ GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]);
if (slice_length > trailing) {
tcp->outgoing_byte_idx = slice_length - trailing;
break;
@@ -604,11 +614,13 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
}
}
- if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
+ if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
*error = GRPC_ERROR_NONE;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ tcp->outgoing_buffer);
return true;
}
- };
+ }
}
static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
@@ -625,14 +637,14 @@ static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
}
if (!tcp_flush(exec_ctx, tcp, &error)) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "write: delayed");
}
notify_on_write(exec_ctx, tcp);
} else {
cb = tcp->write_cb;
tcp->write_cb = nullptr;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write: %s", str);
}
@@ -647,7 +659,7 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_error* error = GRPC_ERROR_NONE;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
size_t i;
for (i = 0; i < buf->count; i++) {
@@ -672,18 +684,17 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
return;
}
tcp->outgoing_buffer = buf;
- tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
if (!tcp_flush(exec_ctx, tcp, &error)) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "write: delayed");
}
notify_on_write(exec_ctx, tcp);
} else {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write: %s", str);
}
diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h
index ff1060b0ff..09051b7ed6 100644
--- a/src/core/lib/iomgr/tcp_posix.h
+++ b/src/core/lib/iomgr/tcp_posix.h
@@ -33,11 +33,7 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-extern grpc_tracer_flag grpc_tcp_trace;
+extern grpc_core::TraceFlag grpc_tcp_trace;
/* Create a tcp endpoint given a file desciptor and a read slice size.
Takes ownership of fd. */
@@ -57,8 +53,4 @@ int grpc_tcp_fd(grpc_endpoint* ep);
void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
int* fd, grpc_closure* done);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index ef983199b8..a1757a2b3e 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -25,10 +25,6 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;
@@ -102,8 +98,4 @@ void grpc_tcp_server_unref(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s);
void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx* exec_ctx,
grpc_tcp_server* s);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index f84fa9751d..6fed13c6c7 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -243,7 +243,7 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* err) {
addr_str = grpc_sockaddr_to_uri(&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h
index 608fba3346..6046f257f9 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix.h
+++ b/src/core/lib/iomgr/tcp_server_utils_posix.h
@@ -24,10 +24,6 @@
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_server.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* one listening port */
typedef struct grpc_tcp_listener {
int fd;
@@ -121,8 +117,4 @@ grpc_error* grpc_tcp_server_prepare_socket(int fd,
/* Ruturn true if the platform supports ifaddrs */
bool grpc_tcp_server_have_ifaddrs(void);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 72443cc29e..5139760634 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -55,7 +55,7 @@ static void init_max_accept_queue_size(void) {
if (fgets(buf, sizeof buf, fp)) {
char* end;
long i = strtol(buf, &end, 10);
- if (i > 0 && i <= INT_MAX && end && *end == 0) {
+ if (i > 0 && i <= INT_MAX && end && *end == '\n') {
n = (int)i;
}
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc
index 0eed4d428f..ffadf0b1ab 100644
--- a/src/core/lib/iomgr/tcp_server_uv.cc
+++ b/src/core/lib/iomgr/tcp_server_uv.cc
@@ -213,7 +213,7 @@ static void finish_accept(grpc_exec_ctx* exec_ctx, grpc_tcp_listener* sp) {
} else {
gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err));
}
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
if (peer_name_string) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s",
sp->server, peer_name_string);
@@ -247,7 +247,7 @@ static void on_connect(uv_stream_t* server, int status) {
GPR_ASSERT(!sp->has_pending_connection);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server);
}
@@ -260,15 +260,36 @@ static void on_connect(uv_stream_t* server, int status) {
grpc_exec_ctx_finish(&exec_ctx);
}
-static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle,
- const grpc_resolved_address* addr,
- unsigned port_index,
- grpc_tcp_listener** listener) {
+static grpc_error* add_addr_to_server(grpc_tcp_server* s,
+ const grpc_resolved_address* addr,
+ unsigned port_index,
+ grpc_tcp_listener** listener) {
grpc_tcp_listener* sp = NULL;
int port = -1;
int status;
grpc_error* error;
grpc_resolved_address sockname_temp;
+ uv_tcp_t* handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
+ int family = grpc_sockaddr_get_family(addr);
+
+ status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
+#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
+ if (family == AF_INET || family == AF_INET6) {
+ int fd;
+ uv_fileno((uv_handle_t*)handle, &fd);
+ int enable = 1;
+ setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
+ }
+#endif /* GPR_LINUX && SO_REUSEPORT */
+
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Failed to initialize UV tcp handle");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ grpc_slice_from_static_string(uv_strerror(status)));
+ return error;
+ }
// The last argument to uv_tcp_bind is flags
status = uv_tcp_bind(handle, (struct sockaddr*)addr->addr, 0);
@@ -325,20 +346,48 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle,
return GRPC_ERROR_NONE;
}
+static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
+ unsigned port_index,
+ int requested_port,
+ grpc_tcp_listener** listener) {
+ grpc_resolved_address wild4;
+ grpc_resolved_address wild6;
+ grpc_tcp_listener* sp = nullptr;
+ grpc_tcp_listener* sp2 = nullptr;
+ grpc_error* v6_err = GRPC_ERROR_NONE;
+ grpc_error* v4_err = GRPC_ERROR_NONE;
+
+ grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
+ /* Try listening on IPv6 first. */
+ if ((v6_err = add_addr_to_server(s, &wild6, port_index, &sp)) ==
+ GRPC_ERROR_NONE) {
+ *listener = sp;
+ return GRPC_ERROR_NONE;
+ }
+
+ if ((v4_err = add_addr_to_server(s, &wild4, port_index, &sp2)) ==
+ GRPC_ERROR_NONE) {
+ *listener = sp2;
+ return GRPC_ERROR_NONE;
+ }
+
+ grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Failed to add any wildcard listeners");
+ root_err = grpc_error_add_child(root_err, v6_err);
+ root_err = grpc_error_add_child(root_err, v4_err);
+ return root_err;
+}
+
grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
const grpc_resolved_address* addr,
int* port) {
// This function is mostly copied from tcp_server_windows.c
grpc_tcp_listener* sp = NULL;
- uv_tcp_t* handle;
grpc_resolved_address addr6_v4mapped;
- grpc_resolved_address wildcard;
grpc_resolved_address* allocated_addr = NULL;
grpc_resolved_address sockname_temp;
unsigned port_index = 0;
- int status;
grpc_error* error = GRPC_ERROR_NONE;
- int family;
GRPC_UV_ASSERT_SAME_THREAD();
@@ -367,43 +416,20 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
}
}
- if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
- addr = &addr6_v4mapped;
- }
-
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
if (grpc_sockaddr_is_wildcard(addr, port)) {
- grpc_sockaddr_make_wildcard6(*port, &wildcard);
-
- addr = &wildcard;
- }
-
- handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
-
- family = grpc_sockaddr_get_family(addr);
- status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
-#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
- if (family == AF_INET || family == AF_INET6) {
- int fd;
- uv_fileno((uv_handle_t*)handle, &fd);
- int enable = 1;
- setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
- }
-#endif /* GPR_LINUX && SO_REUSEPORT */
-
- if (status == 0) {
- error = add_socket_to_server(s, handle, addr, port_index, &sp);
+ error = add_wildcard_addrs_to_server(s, port_index, *port, &sp);
} else {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Failed to initialize UV tcp handle");
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
+ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
+ addr = &addr6_v4mapped;
+ }
+
+ error = add_addr_to_server(s, addr, port_index, &sp);
}
gpr_free(allocated_addr);
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
char* port_string;
grpc_sockaddr_to_string(&port_string, addr, 0);
const char* str = grpc_error_string(error);
@@ -435,7 +461,7 @@ void grpc_tcp_server_start(grpc_exec_ctx* exec_ctx, grpc_tcp_server* server,
(void)pollsets;
(void)pollset_count;
GRPC_UV_ASSERT_SAME_THREAD();
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "SERVER_START %p", server);
}
GPR_ASSERT(on_accept_cb);
diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc
index ac9ca4ea11..40f4006203 100644
--- a/src/core/lib/iomgr/tcp_uv.cc
+++ b/src/core/lib/iomgr/tcp_uv.cc
@@ -38,7 +38,7 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
+grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
typedef struct {
grpc_endpoint base;
@@ -52,12 +52,12 @@ typedef struct {
grpc_closure* read_cb;
grpc_closure* write_cb;
- grpc_slice read_slice;
grpc_slice_buffer* read_slices;
grpc_slice_buffer* write_slices;
uv_buf_t* write_buffers;
grpc_resource_user* resource_user;
+ grpc_resource_user_slice_allocator slice_allocator;
bool shutting_down;
@@ -66,7 +66,6 @@ typedef struct {
} grpc_tcp;
static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
- grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->handle);
gpr_free(tcp->peer_string);
@@ -79,7 +78,7 @@ static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
const char* reason, const char* file, int line) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
@@ -92,7 +91,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
int line) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
@@ -119,91 +118,117 @@ static void uv_close_callback(uv_handle_t* handle) {
grpc_exec_ctx_finish(&exec_ctx);
}
-static grpc_slice alloc_read_slice(grpc_exec_ctx* exec_ctx,
- grpc_resource_user* resource_user) {
- return grpc_resource_user_slice_malloc(exec_ctx, resource_user,
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
-}
-
static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size,
uv_buf_t* buf) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp* tcp = (grpc_tcp*)handle->data;
(void)suggested_size;
- buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slice);
- buf->len = GRPC_SLICE_LENGTH(tcp->read_slice);
+ /* Before calling uv_read_start, we allocate a buffer with exactly one slice
+ * to tcp->read_slices and wait for the callback indicating that the
+ * allocation was successful. So slices[0] should always exist here */
+ buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[0]);
+ buf->len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]);
grpc_exec_ctx_finish(&exec_ctx);
}
+static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
+ grpc_error* error) {
+ grpc_closure* cb = tcp->read_cb;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
+ size_t i;
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+
+ for (i = 0; i < tcp->read_slices->count; i++) {
+ char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
+ gpr_free(dump);
+ }
+ }
+ tcp->read_slices = NULL;
+ tcp->read_cb = NULL;
+ GRPC_CLOSURE_RUN(exec_ctx, cb, error);
+}
+
static void read_callback(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
- grpc_slice sub;
grpc_error* error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp* tcp = (grpc_tcp*)stream->data;
- grpc_closure* cb = tcp->read_cb;
+ grpc_slice_buffer garbage;
if (nread == 0) {
// Nothing happened. Wait for the next callback
return;
}
TCP_UNREF(&exec_ctx, tcp, "read");
- tcp->read_cb = NULL;
// TODO(murgatroid99): figure out what the return value here means
uv_read_stop(stream);
if (nread == UV_EOF) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF");
+ grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, tcp->read_slices);
} else if (nread > 0) {
// Successful read
- sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
- grpc_slice_buffer_add(tcp->read_slices, sub);
- tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user);
error = GRPC_ERROR_NONE;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
- size_t i;
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "read: error=%s", str);
-
- for (i = 0; i < tcp->read_slices->count; i++) {
- char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
- dump);
- gpr_free(dump);
- }
+ if ((size_t)nread < tcp->read_slices->length) {
+ /* TODO(murgatroid99): Instead of discarding the unused part of the read
+ * buffer, reuse it as the next read buffer. */
+ grpc_slice_buffer_init(&garbage);
+ grpc_slice_buffer_trim_end(
+ tcp->read_slices, tcp->read_slices->length - (size_t)nread, &garbage);
+ grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &garbage);
}
} else {
// nread < 0: Error
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed");
+ grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, tcp->read_slices);
}
- GRPC_CLOSURE_SCHED(&exec_ctx, cb, error);
+ call_read_cb(&exec_ctx, tcp, error);
grpc_exec_ctx_finish(&exec_ctx);
}
+static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp,
+ grpc_error* error) {
+ int status;
+ grpc_tcp* tcp = (grpc_tcp*)tcpp;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp,
+ grpc_error_string(error));
+ }
+ if (error == GRPC_ERROR_NONE) {
+ status =
+ uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start");
+ error = grpc_error_set_str(
+ error, GRPC_ERROR_STR_OS_ERROR,
+ grpc_slice_from_static_string(uv_strerror(status)));
+ }
+ }
+ if (error != GRPC_ERROR_NONE) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->read_slices);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
+ TCP_UNREF(exec_ctx, tcp, "read");
+ }
+ if (grpc_tcp_trace.enabled()) {
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str);
+ }
+}
+
static void uv_endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_slice_buffer* read_slices, grpc_closure* cb) {
grpc_tcp* tcp = (grpc_tcp*)ep;
- int status;
- grpc_error* error = GRPC_ERROR_NONE;
GRPC_UV_ASSERT_SAME_THREAD();
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_slices = read_slices;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices);
TCP_REF(tcp, "read");
- // TODO(murgatroid99): figure out what the return value here means
- status =
- uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback);
- if (status != 0) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start");
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
- }
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str);
- }
+ grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
+ tcp->read_slices);
}
static void write_callback(uv_write_t* req, int status) {
@@ -218,13 +243,11 @@ static void write_callback(uv_write_t* req, int status) {
} else {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Write failed");
}
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
}
gpr_free(tcp->write_buffers);
- grpc_resource_user_free(&exec_ctx, tcp->resource_user,
- sizeof(uv_buf_t) * tcp->write_slices->count);
GRPC_CLOSURE_SCHED(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -240,7 +263,7 @@ static void uv_endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
uv_write_t* write_req;
GRPC_UV_ASSERT_SAME_THREAD();
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
size_t j;
for (j = 0; j < write_slices->count; j++) {
@@ -271,8 +294,6 @@ static void uv_endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
tcp->write_cb = cb;
buffer_count = (unsigned int)tcp->write_slices->count;
buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * buffer_count);
- grpc_resource_user_alloc(exec_ctx, tcp->resource_user,
- sizeof(uv_buf_t) * buffer_count, NULL);
for (i = 0; i < buffer_count; i++) {
slice = &tcp->write_slices->slices[i];
buffers[i].base = (char*)GRPC_SLICE_START_PTR(*slice);
@@ -320,7 +341,7 @@ static void uv_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_error* why) {
grpc_tcp* tcp = (grpc_tcp*)ep;
if (!tcp->shutting_down) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(why);
gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str);
}
@@ -367,7 +388,7 @@ grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle,
grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
}
@@ -381,8 +402,10 @@ grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle,
gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
tcp->shutting_down = false;
+ tcp->read_slices = NULL;
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
- tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user);
+ grpc_resource_user_slice_allocator_init(
+ &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h
index 708e8469e6..fd6d19049a 100644
--- a/src/core/lib/iomgr/tcp_uv.h
+++ b/src/core/lib/iomgr/tcp_uv.h
@@ -38,22 +38,14 @@
#include <uv.h>
-extern grpc_tracer_flag grpc_tcp_trace;
+extern grpc_core::TraceFlag grpc_tcp_trace;
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
-#ifdef __cplusplus
-extern "C" {
-#endif
-
grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle,
grpc_resource_quota* resource_quota,
char* peer_string);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_UV */
#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 04922b4037..33868cdc7a 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -49,7 +49,7 @@
#define GRPC_FIONBIO FIONBIO
#endif
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
+grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
static grpc_error* set_non_block(SOCKET sock) {
int status;
@@ -124,7 +124,7 @@ static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
const char* reason, const char* file, int line) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
@@ -137,7 +137,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
int line) {
- if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index 9c7ccdf132..28287e2795 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -35,10 +35,6 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/socket_windows.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle.
*/
@@ -48,10 +44,6 @@ grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_winsocket* socket,
grpc_error* grpc_tcp_prepare_socket(SOCKET sock);
-#ifdef __cplusplus
-}
-#endif
-
#endif
#endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */
diff --git a/src/core/lib/iomgr/time_averaged_stats.h b/src/core/lib/iomgr/time_averaged_stats.h
index d38ed272b6..8745f7fa13 100644
--- a/src/core/lib/iomgr/time_averaged_stats.h
+++ b/src/core/lib/iomgr/time_averaged_stats.h
@@ -19,10 +19,6 @@
#ifndef GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H
#define GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* This tracks a time-decaying weighted average. It works by collecting
batches of samples and then mixing their average into a time-decaying
weighted mean. It is designed for batch operations where we do many adds
@@ -74,8 +70,4 @@ void grpc_time_averaged_stats_add_sample(grpc_time_averaged_stats* stats,
value. */
double grpc_time_averaged_stats_update_average(grpc_time_averaged_stats* stats);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H */
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index cd8334eceb..b9acce229e 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -32,10 +32,6 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef struct grpc_timer grpc_timer;
/* Initialize *timer. When expired or canceled, closure will be called with
@@ -106,8 +102,4 @@ void grpc_timer_consume_kick(void);
void grpc_kick_poller(void);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_H */
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc
index a4bfbcb342..fa95c43dbe 100644
--- a/src/core/lib/iomgr/timer_generic.cc
+++ b/src/core/lib/iomgr/timer_generic.cc
@@ -42,11 +42,8 @@
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
-extern "C" {
-grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer");
-grpc_tracer_flag grpc_timer_check_trace =
- GRPC_TRACER_INITIALIZER(false, "timer_check");
-}
+grpc_core::TraceFlag grpc_timer_trace(false, "timer");
+grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check");
/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
* deadlines earlier than 'queue_deadline" cap are maintained in the heap and
@@ -253,8 +250,6 @@ void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) {
g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx);
gpr_tls_init(&g_last_seen_min_timer);
gpr_tls_set(&g_last_seen_min_timer, 0);
- grpc_register_tracer(&grpc_timer_trace);
- grpc_register_tracer(&grpc_timer_check_trace);
for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i];
@@ -339,7 +334,7 @@ void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer,
timer->hash_table_next = nullptr;
#endif
- if (GRPC_TRACER_ON(grpc_timer_trace)) {
+ if (grpc_timer_trace.enabled()) {
gpr_log(GPR_DEBUG,
"TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer,
deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb);
@@ -375,7 +370,7 @@ void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer,
timer->heap_index = INVALID_HEAP_INDEX;
list_join(&shard->list, timer);
}
- if (GRPC_TRACER_ON(grpc_timer_trace)) {
+ if (grpc_timer_trace.enabled()) {
gpr_log(GPR_DEBUG,
" .. add to shard %d with queue_deadline_cap=%" PRIdPTR
" => is_first_timer=%s",
@@ -397,7 +392,7 @@ void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer,
grpc_timer_check. */
if (is_first_timer) {
gpr_mu_lock(&g_shared_mutables.mu);
- if (GRPC_TRACER_ON(grpc_timer_trace)) {
+ if (grpc_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR,
shard->min_deadline);
}
@@ -427,7 +422,7 @@ void grpc_timer_cancel(grpc_exec_ctx* exec_ctx, grpc_timer* timer) {
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
gpr_mu_lock(&shard->mu);
- if (GRPC_TRACER_ON(grpc_timer_trace)) {
+ if (grpc_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
timer->pending ? "true" : "false");
}
@@ -468,7 +463,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
(gpr_atm)(deadline_delta * 1000.0));
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR,
(int)(shard - g_shards), shard->queue_deadline_cap);
}
@@ -476,7 +471,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
next = timer->next;
if (timer->deadline < shard->queue_deadline_cap) {
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap",
timer->deadline);
}
@@ -493,7 +488,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
grpc_timer* timer;
for (;;) {
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s",
(int)(shard - g_shards),
grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
@@ -503,13 +498,13 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
if (!refill_heap(shard, now)) return nullptr;
}
timer = grpc_timer_heap_top(&shard->heap);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG,
" .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR,
timer->deadline, now);
}
if (timer->deadline > now) return nullptr;
- if (GRPC_TRACER_ON(grpc_timer_trace)) {
+ if (grpc_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler",
timer, now - timer->deadline,
timer->closure->scheduler->vtable->name);
@@ -534,7 +529,7 @@ static size_t pop_timers(grpc_exec_ctx* exec_ctx, timer_shard* shard,
}
*new_min_deadline = compute_min_deadline(shard);
gpr_mu_unlock(&shard->mu);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR,
(int)(shard - g_shards), n);
}
@@ -558,7 +553,7 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx,
gpr_mu_lock(&g_shared_mutables.mu);
result = GRPC_TIMERS_CHECKED_AND_EMPTY;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR,
(int)(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline);
@@ -576,7 +571,7 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx,
result = GRPC_TIMERS_FIRED;
}
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG,
" .. result --> %d"
", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR
@@ -621,7 +616,7 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx,
if (next != nullptr) {
*next = GPR_MIN(*next, min_timer);
}
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG,
"TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now,
min_timer);
@@ -635,7 +630,7 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx,
: GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
// tracing
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
char* next_str;
if (next == nullptr) {
next_str = gpr_strdup("NULL");
@@ -653,7 +648,7 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx,
grpc_timer_check_result r =
run_some_expired_timers(exec_ctx, now, next, shutdown_error);
// tracing
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
char* next_str;
if (next == nullptr) {
next_str = gpr_strdup("NULL");
diff --git a/src/core/lib/iomgr/timer_heap.h b/src/core/lib/iomgr/timer_heap.h
index ae56e5a73e..436eef55a6 100644
--- a/src/core/lib/iomgr/timer_heap.h
+++ b/src/core/lib/iomgr/timer_heap.h
@@ -21,10 +21,6 @@
#include "src/core/lib/iomgr/timer.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef struct {
grpc_timer** timers;
uint32_t timer_count;
@@ -43,8 +39,4 @@ void grpc_timer_heap_pop(grpc_timer_heap* heap);
int grpc_timer_heap_is_empty(grpc_timer_heap* heap);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_HEAP_H */
diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc
index acc40b6c9e..87ed0e05dc 100644
--- a/src/core/lib/iomgr/timer_manager.cc
+++ b/src/core/lib/iomgr/timer_manager.cc
@@ -33,7 +33,7 @@ typedef struct completed_thread {
struct completed_thread* next;
} completed_thread;
-extern "C" grpc_tracer_flag grpc_timer_check_trace;
+extern grpc_core::TraceFlag grpc_timer_check_trace;
// global mutex
static gpr_mu g_mu;
@@ -81,7 +81,7 @@ static void start_timer_thread_and_unlock(void) {
++g_waiter_count;
++g_thread_count;
gpr_mu_unlock(&g_mu);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "Spawn timer thread");
}
gpr_thd_options opt = gpr_thd_options_default();
@@ -93,7 +93,7 @@ static void start_timer_thread_and_unlock(void) {
// to leak through g_completed_threads and be freed in gc_completed_threads()
// before "&ct->t" is written to, causing a use-after-free.
gpr_mu_lock(&g_mu);
- gpr_thd_new(&ct->t, timer_thread, ct, &opt);
+ gpr_thd_new(&ct->t, "grpc_global_timer", timer_thread, ct, &opt);
gpr_mu_unlock(&g_mu);
}
@@ -115,7 +115,7 @@ static void run_some_timers(grpc_exec_ctx* exec_ctx) {
// if there's no thread waiting with a timeout, kick an existing
// waiter so that the next deadline is not missed
if (!g_has_timed_waiter) {
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "kick untimed waiter");
}
gpr_cv_signal(&g_cv_wait);
@@ -123,7 +123,7 @@ static void run_some_timers(grpc_exec_ctx* exec_ctx) {
gpr_mu_unlock(&g_mu);
}
// without our lock, flush the exec_ctx
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "flush exec_ctx");
}
grpc_exec_ctx_flush(exec_ctx);
@@ -178,7 +178,7 @@ static bool wait_until(grpc_exec_ctx* exec_ctx, grpc_millis next) {
g_has_timed_waiter = true;
g_timed_waiter_deadline = next;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx);
gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds",
wait_time);
@@ -188,15 +188,14 @@ static bool wait_until(grpc_exec_ctx* exec_ctx, grpc_millis next) {
}
}
- if (GRPC_TRACER_ON(grpc_timer_check_trace) &&
- next == GRPC_MILLIS_INF_FUTURE) {
+ if (grpc_timer_check_trace.enabled() && next == GRPC_MILLIS_INF_FUTURE) {
gpr_log(GPR_DEBUG, "sleep until kicked");
}
gpr_cv_wait(&g_cv_wait, &g_mu,
grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME));
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
my_timed_waiter_generation == g_timed_waiter_generation,
g_kicked);
@@ -226,10 +225,6 @@ static void timer_main_loop(grpc_exec_ctx* exec_ctx) {
grpc_millis next = GRPC_MILLIS_INF_FUTURE;
grpc_exec_ctx_invalidate_now(exec_ctx);
- /* Calibrate g_start_time in exec_ctx.cc with a regular interval in case the
- * system clock has changed */
- grpc_exec_ctx_maybe_update_start_time(exec_ctx);
-
// check timer state, updates next to the next time to run a check
switch (grpc_timer_check(exec_ctx, &next)) {
case GRPC_TIMERS_FIRED:
@@ -245,7 +240,7 @@ static void timer_main_loop(grpc_exec_ctx* exec_ctx) {
Consequently, we can just sleep forever here and be happy at some
saved wakeup cycles. */
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "timers not checked: expect another thread to");
}
next = GRPC_MILLIS_INF_FUTURE;
@@ -271,7 +266,7 @@ static void timer_thread_cleanup(completed_thread* ct) {
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "End timer thread");
}
}
@@ -314,18 +309,18 @@ void grpc_timer_manager_init(void) {
static void stop_threads(void) {
gpr_mu_lock(&g_mu);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "stop timer threads: threaded=%d", g_threaded);
}
if (g_threaded) {
g_threaded = false;
gpr_cv_broadcast(&g_cv_wait);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count);
}
while (g_thread_count > 0) {
gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count);
}
gc_completed_threads();
diff --git a/src/core/lib/iomgr/timer_manager.h b/src/core/lib/iomgr/timer_manager.h
index 72960d6ffc..0ba502928a 100644
--- a/src/core/lib/iomgr/timer_manager.h
+++ b/src/core/lib/iomgr/timer_manager.h
@@ -21,10 +21,6 @@
#include <stdbool.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* Timer Manager tries to keep one thread waiting for the next timeout at all
times */
@@ -38,8 +34,4 @@ void grpc_timer_manager_set_threading(bool enabled);
* disabled */
void grpc_timer_manager_tick(void);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_MANAGER_H */
diff --git a/src/core/lib/iomgr/timer_uv.cc b/src/core/lib/iomgr/timer_uv.cc
index df40e54ae6..fac2026fa9 100644
--- a/src/core/lib/iomgr/timer_uv.cc
+++ b/src/core/lib/iomgr/timer_uv.cc
@@ -29,11 +29,8 @@
#include <uv.h>
-extern "C" {
-grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer");
-grpc_tracer_flag grpc_timer_check_trace =
- GRPC_TRACER_INITIALIZER(false, "timer_check");
-}
+grpc_core::TraceFlag grpc_timer_trace(false, "timer");
+grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check");
static void timer_close_callback(uv_handle_t* handle) { gpr_free(handle); }
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 68ab9355ca..7b7d6946b1 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -47,6 +47,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -71,14 +72,22 @@ struct grpc_udp_listener {
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ // To be scheduled on another thread to actually read/write.
+ grpc_closure do_read_closure;
+ grpc_closure do_write_closure;
+ grpc_closure notify_on_write_closure;
// True if orphan_cb is trigered.
bool orphan_notified;
+ // True if grpc_fd_notify_on_write() is called after on_write() call.
+ bool notify_on_write_armed;
+ // True if fd has been shutdown.
+ bool already_shutdown;
struct grpc_udp_listener* next;
};
struct shutdown_fd_args {
- grpc_fd* fd;
+ grpc_udp_listener* sp;
gpr_mu* server_mu;
};
@@ -144,8 +153,17 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args,
grpc_error* error) {
struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args;
+ grpc_udp_listener* sp = shutdown_args->sp;
+ gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd);
gpr_mu_lock(shutdown_args->server_mu);
- grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error));
+ grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error));
+ sp->already_shutdown = true;
+ if (!sp->notify_on_write_armed) {
+ // Re-arm write notification to notify listener with error. This is
+ // necessary to decrement active_ports.
+ sp->notify_on_write_armed = true;
+ grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ }
gpr_mu_unlock(shutdown_args->server_mu);
gpr_free(shutdown_args);
}
@@ -161,6 +179,7 @@ static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
gpr_mu_destroy(&s->mu);
+ gpr_log(GPR_DEBUG, "Destroy all listeners.");
while (s->head) {
grpc_udp_listener* sp = s->head;
s->head = sp->next;
@@ -207,9 +226,10 @@ static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. Because at this point, all listening ports
* have been shutdown already, no need to shutdown again.*/
- GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
+ GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb);
+ gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd);
sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
sp->server->user_data);
}
@@ -233,13 +253,14 @@ void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
s->shutdown_complete = on_done;
+ gpr_log(GPR_DEBUG, "start to destroy udp_server");
/* shutdown all fd's */
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
struct shutdown_fd_args* args =
(struct shutdown_fd_args*)gpr_malloc(sizeof(*args));
- args->fd = sp->emfd;
+ args->sp = sp;
args->server_mu = &s->mu;
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
grpc_schedule_on_exec_ctx);
@@ -329,6 +350,28 @@ error:
return -1;
}
+static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+ GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE);
+ /* TODO: the reason we hold server->mu here is merely to prevent fd
+ * shutdown while we are reading. However, it blocks do_write(). Switch to
+ * read lock if available. */
+ gpr_mu_lock(&sp->server->mu);
+ /* Tell the registered callback that data is available to read. */
+ if (!sp->already_shutdown &&
+ sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
+ /* There maybe more packets to read. Schedule read_more_cb_ closure to run
+ * after finishing this event loop. */
+ GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
+ } else {
+ /* Finish reading all the packets, re-arm the notification event so we can
+ * get another chance to read. Or fd already shutdown, re-arm to get a
+ * notification with shutdown error. */
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+}
+
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
@@ -343,13 +386,51 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
}
return;
}
-
- /* Tell the registered callback that data is available to read. */
+ /* Read once. If there is more data to read, off load the work to another
+ * thread to finish. */
GPR_ASSERT(sp->read_cb);
- sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data);
+ if (sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
+ /* There maybe more packets to read. Schedule read_more_cb_ closure to run
+ * after finishing this event loop. */
+ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
+ grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
+ GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
+ } else {
+ /* Finish reading all the packets, re-arm the notification event so we can
+ * get another chance to read. Or fd already shutdown, re-arm to get a
+ * notification with shutdown error. */
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+}
+
+// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
+void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+ gpr_mu_lock(&sp->server->mu);
+ if (!sp->notify_on_write_armed) {
+ grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ sp->notify_on_write_armed = true;
+ }
+ gpr_mu_unlock(&sp->server->mu);
+}
- /* Re-arm the notification event so we get another chance to read. */
- grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+ gpr_mu_lock(&(sp->server->mu));
+ if (sp->already_shutdown) {
+ // If fd has been shutdown, don't write any more and re-arm notification.
+ grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ } else {
+ sp->notify_on_write_armed = false;
+ /* Tell the registered callback that the socket is writeable. */
+ GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE);
+ GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper,
+ arg, grpc_schedule_on_exec_ctx);
+ sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data,
+ &sp->notify_on_write_closure);
+ }
gpr_mu_unlock(&sp->server->mu);
}
@@ -367,12 +448,11 @@ static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
return;
}
- /* Tell the registered callback that the socket is writeable. */
- GPR_ASSERT(sp->write_cb);
- sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data);
+ /* Schedule actual write in another thread. */
+ GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg,
+ grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
- /* Re-arm the notification event so we get another chance to write. */
- grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&sp->server->mu);
}
@@ -409,6 +489,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
sp->orphan_notified = false;
+ sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
@@ -533,6 +614,7 @@ void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
grpc_schedule_on_exec_ctx);
+ sp->notify_on_write_armed = true;
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
/* Registered for both read and write callbacks: increment active_ports
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index bca0f049fb..1bd6922de6 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -23,10 +23,6 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/resolve_address.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/* Forward decl of struct grpc_server */
/* This is not typedef'ed to avoid a typedef-redefinition error */
struct grpc_server;
@@ -34,13 +30,16 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
-/* Called when data is available to read from the socket. */
-typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
+/* Called when data is available to read from the socket.
+ * Return true if there is more data to read from fd. */
+typedef bool (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
void* user_data);
-/* Called when the socket is writeable. */
+/* Called when the socket is writeable. The given closure should be scheduled
+ * when the socket becomes blocked next time. */
typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
- void* user_data);
+ void* user_data,
+ grpc_closure* notify_on_write_closure);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx* exec_ctx,
@@ -77,8 +76,4 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* server,
grpc_closure* on_done);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */
diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h
index be3c33d9c2..1c079e6e76 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.h
+++ b/src/core/lib/iomgr/unix_sockets_posix.h
@@ -25,10 +25,6 @@
#include "src/core/lib/iomgr/resolve_address.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
void grpc_create_socketpair_if_unix(int sv[2]);
grpc_error* grpc_resolve_unix_domain_address(
@@ -42,8 +38,4 @@ void grpc_unlink_if_unix_domain_socket(
char* grpc_sockaddr_to_uri_unix_if_possible(
const grpc_resolved_address* resolved_addr);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index dcd7bdb560..017e41bfa8 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -40,10 +40,6 @@
#define GRPC_FD_TO_IDX(fd) (-(fd)-1)
#define GRPC_IDX_TO_FD(idx) (-(idx)-1)
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef struct cv_node {
gpr_cv* cv;
struct cv_node* next;
@@ -68,8 +64,4 @@ typedef struct cv_fd_table {
extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable;
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */
diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.h b/src/core/lib/iomgr/wakeup_fd_pipe.h
index 9bbb5e2ff7..326a0c4e01 100644
--- a/src/core/lib/iomgr/wakeup_fd_pipe.h
+++ b/src/core/lib/iomgr/wakeup_fd_pipe.h
@@ -21,14 +21,6 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
extern const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable;
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_PIPE_H */
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h
index ae7849f98c..a9584d0d48 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.h
+++ b/src/core/lib/iomgr/wakeup_fd_posix.h
@@ -49,10 +49,6 @@
#include "src/core/lib/iomgr/error.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
void grpc_wakeup_fd_global_init(void);
void grpc_wakeup_fd_global_destroy(void);
@@ -95,8 +91,4 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info);
* wakeup_fd_nospecial.c if no such implementation exists. */
extern const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable;
-#ifdef __cplusplus
-}
-#endif
-
#endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H */