diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 37 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.c | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.c | 35 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 23 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.c | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer.h | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 114 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.c | 45 |
12 files changed, 140 insertions, 158 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 1f8d7eef26..4dc22fbcea 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -24,6 +24,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <string.h> #include <sys/socket.h> @@ -198,8 +199,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline); /* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 @@ -846,7 +847,7 @@ static void work_combine_error(grpc_error **composite, grpc_error *error) { static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; @@ -914,7 +915,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd_watcher *watchers; struct pollfd *pfds; - timeout = poll_deadline_to_millis_timeout(deadline, now); + timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (pollset->fd_count + 2 <= inline_elements) { pfds = pollfd_space; @@ -1024,13 +1025,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + deadline = 0; } keep_polling = 1; } - if (keep_polling) { - now = gpr_now(now.clock_type); - } } gpr_tls_set(&g_current_thread_poller, 0); if (added_worker) { @@ -1082,21 +1080,14 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; + if (deadline == 0) return 0; + grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx); + if (n < 0) return 0; + if (n > INT_MAX) return -1; + return (int)n; } /******************************************************************************* diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index cff77e641c..682794f527 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -208,9 +208,9 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); + grpc_pollset_worker **worker, + grpc_millis deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, deadline); } grpc_error *grpc_pollset_kick(grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 0de7333843..21279877e8 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -52,8 +52,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, + grpc_millis deadline); grpc_error *(*pollset_kick)(grpc_pollset *pollset, grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 833170ceed..5d2fc4af86 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -108,9 +108,42 @@ static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -void grpc_exec_ctx_global_init(void) {} +static gpr_timespec g_start_time; + +void grpc_exec_ctx_global_init(void) { + g_start_time = gpr_now(GPR_CLOCK_MONOTONIC); +} + void grpc_exec_ctx_global_shutdown(void) {} +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time); + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx) { + if (!exec_ctx->now_is_valid) { + exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + exec_ctx->now_is_valid = true; + } + return exec_ctx->now; +} + +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) { + exec_ctx->now_is_valid = false; +} + +gpr_timespec grpc_millis_to_timespec(grpc_exec_ctx *exec_ctx, + grpc_millis millis, + gpr_clock_type clock_type) { + return gpr_time_add(gpr_convert_clock_type(g_start_time, clock_type), + gpr_time_from_millis(millis, GPR_TIMESPAN)); +} + static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { exec_ctx_run, exec_ctx_sched, "exec_ctx"}; static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index a0d2a965d5..8e030bf741 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -19,9 +19,13 @@ #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H +#include <grpc/support/atm.h> + #include "src/core/lib/iomgr/closure.h" -/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ +typedef gpr_atm grpc_millis; + +#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ @@ -64,12 +68,18 @@ struct grpc_exec_ctx { uintptr_t flags; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); + + bool now_is_valid; + grpc_millis now; }; /* initializer for grpc_exec_ctx: prefer to use GRPC_EXEC_CTX_INIT whenever possible */ -#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ - { GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, finish_check_arg, finish_check } +#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ + { \ + GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, finish_check_arg, finish_check, \ + false, 0 \ + } /* initialize an execution context at the top level of an API call into grpc (this is safe to use elsewhere, though possibly not as efficient) */ @@ -101,4 +111,11 @@ void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_shutdown(void); +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx); +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx); +gpr_timespec grpc_millis_to_timespec(grpc_exec_ctx *exec_ctx, + grpc_millis millis, gpr_clock_type clock); +grpc_millis grpc_timespec_to_millis(grpc_exec_ctx *exec_ctx, + gpr_timespec timespec); + #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 3d19953eeb..5e15b01ffd 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -47,8 +47,14 @@ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); +<<<<<<< HEAD + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_timer_list_init(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); +======= grpc_executor_init(exec_ctx); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); +>>>>>>> 4708a21c8144f9fed3c90ea8fa153aa20302a1d7 g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_network_status_init(); @@ -95,8 +101,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == - GRPC_TIMERS_FIRED) { + exec_ctx->now_is_valid = true; + exec_ctx->now = GRPC_MILLIS_INF_FUTURE; + if (grpc_timer_check(exec_ctx, NULL) == GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index a609a3877a..49d4251894 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -71,8 +71,8 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); pollset lock */ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) GRPC_MUST_USE_RESULT; + grpc_pollset_worker **worker, + grpc_millis deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is non-NULL, then kick that worker. */ diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 6c9e51ae84..c04fb8bc07 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -35,6 +35,6 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline); + grpc_millis deadline); #endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 21e320a6e7..992e1868c0 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -242,7 +242,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -322,9 +322,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&ac->mu); GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); @@ -338,14 +336,14 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index b92b8fb8b8..89c997bdca 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -41,8 +41,7 @@ typedef struct grpc_timer grpc_timer; application callback is also responsible for maintaining information about when to free up any user-level state. */ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now); + grpc_millis deadline, grpc_closure *closure); /* Note that there is no timer destroy function. This is because the timer is a one-time occurrence with a guarantee that the callback will @@ -88,8 +87,8 @@ typedef enum { with high probability at least one thread in the system will see an update at any time slice. */ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next); -void grpc_timer_list_init(gpr_timespec now); + grpc_millis *next); +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); /* Consume a kick issued by grpc_kick_poller */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 12efce241f..11d0b338f1 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -99,9 +99,6 @@ static struct shared_mutables g_shared_mutables = { .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, }; -static gpr_clock_type g_clock_type; -static gpr_timespec g_start_time; - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -114,51 +111,18 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm *next, grpc_error *error); -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - -static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = GPR_MS_PER_SEC * (double)ts.tv_sec + - (double)ts.tv_nsec / GPR_NS_PER_MS + - (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = - GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_timespec atm_to_timespec(gpr_atm x) { - return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); -} - static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } -void grpc_timer_list_init(gpr_timespec now) { +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { uint32_t i; g_shared_mutables.initialized = true; gpr_mu_init(&g_shared_mutables.mu); - g_clock_type = now.clock_type; - g_start_time = now; - g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); grpc_register_tracer(&grpc_timer_trace); @@ -193,10 +157,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { g_shared_mutables.initialized = false; } -static double ts_to_dbl(gpr_timespec ts) { - return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; -} - /* returns true if the first element in the list */ static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; @@ -235,20 +195,15 @@ static void note_deadline_change(timer_shard *shard) { } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { int is_first_timer = 0; timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; - GPR_ASSERT(deadline.clock_type == g_clock_type); - GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline); if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR - "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", - timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec, - now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb); + gpr_log(GPR_DEBUG, + "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, + deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb); } if (!g_shared_mutables.initialized) { @@ -261,7 +216,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_mu_lock(&shard->mu); timer->pending = true; - if (gpr_time_cmp(deadline, now) <= 0) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline <= now) { timer->pending = false; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); gpr_mu_unlock(&shard->mu); @@ -269,9 +225,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, return; } - grpc_time_averaged_stats_add_sample(&shard->stats, - ts_to_dbl(gpr_time_sub(deadline, now))); - if (deadline_atm < shard->queue_deadline_cap) { + grpc_time_averaged_stats_add_sample(&shard->stats, (deadline - now) / 1000.0); + if (deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; @@ -302,12 +257,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, shard->min_deadline); } - if (deadline_atm < shard->min_deadline) { + if (deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline_atm; + shard->min_deadline = deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && deadline_atm < old_min_deadline) { - gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline_atm); + if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline); grpc_kick_poller(); } } @@ -500,29 +455,27 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { // prelude - GPR_ASSERT(now.clock_type == g_clock_type); - gpr_atm now_atm = timespec_to_atm_round_down(now); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ - gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); - if (now_atm < min_timer) { + grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now < min_timer) { if (next != NULL) { - *next = - atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); + *next = GPR_MIN(*next, min_timer); } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, - "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, - now_atm, min_timer); + "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, + min_timer); } return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 + now != GRPC_MILLIS_INF_FUTURE ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); @@ -532,34 +485,23 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, timespec_to_atm_round_down(*next)); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } - gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR - "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, - now.tv_sec, now.tv_nsec, now_atm, next_str, - gpr_tls_get(&g_last_seen_min_timer), + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR + " next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + now, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); } // actual code - grpc_timer_check_result r; - gpr_atm next_atm; - if (next == NULL) { - r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); - } else { - next_atm = timespec_to_atm_round_down(*next); - r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); - *next = atm_to_timespec(next_atm); - } + bool r = run_some_expired_timers(exec_ctx, now, next, shutdown_error); // tracing if (GRPC_TRACER_ON(grpc_timer_check_trace)) { char *next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, next_atm); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 631f7935d9..751314c23f 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -52,7 +52,7 @@ static bool g_kicked; static bool g_has_timed_waiter; // the deadline of the current timed waiter thread (only relevant if // g_has_timed_waiter is true) -static gpr_timespec g_timed_waiter_deadline; +static grpc_millis g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -96,9 +96,8 @@ static void start_timer_thread_and_unlock(void) { void grpc_timer_manager_tick() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_timer_check(&exec_ctx, now, &next); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_timer_check(&exec_ctx, &next); grpc_exec_ctx_finish(&exec_ctx); } @@ -133,8 +132,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { // wait until 'next' (or forever if there is already a timed waiter in the pool) // returns true if the thread should continue executing (false if it should // shutdown) -static bool wait_until(gpr_timespec next) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); +static bool wait_until(grpc_exec_ctx *exec_ctx, grpc_millis next) { gpr_mu_lock(&g_mu); // if we're not threaded anymore, leave if (!g_threaded) { @@ -168,30 +166,29 @@ static bool wait_until(gpr_timespec next) { unless their 'next' is earlier than the current timed-waiter's deadline (in which case the thread with earlier 'next' takes over as the new timed waiter) */ - if (gpr_time_cmp(next, inf_future) != 0) { - if (!g_has_timed_waiter || - (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + if (next != GRPC_MILLIS_INF_FUTURE) { + if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) { my_timed_waiter_generation = ++g_timed_waiter_generation; g_has_timed_waiter = true; g_timed_waiter_deadline = next; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = - gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx); + gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", + wait_time); } } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; } } if (GRPC_TRACER_ON(grpc_timer_check_trace) && - gpr_time_cmp(next, inf_future) == 0) { + next == GRPC_MILLIS_INF_FUTURE) { gpr_log(GPR_DEBUG, "sleep until kicked"); } - gpr_cv_wait(&g_cv_wait, &g_mu, next); + gpr_cv_wait(&g_cv_wait, &g_mu, + grpc_millis_to_timespec(exec_ctx, next, GPR_CLOCK_REALTIME)); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", @@ -203,7 +200,7 @@ static bool wait_until(gpr_timespec next) { // there's work to do after checking timers (code above) if (my_timed_waiter_generation == g_timed_waiter_generation) { g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; } } @@ -219,12 +216,10 @@ static bool wait_until(gpr_timespec next) { } static void timer_main_loop(grpc_exec_ctx *exec_ctx) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { - gpr_timespec next = inf_future; - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; // check timer state, updates next to the next time to run a check - switch (grpc_timer_check(exec_ctx, now, &next)) { + switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: run_some_timers(exec_ctx); break; @@ -241,10 +236,10 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; /* fall through */ case GRPC_TIMERS_CHECKED_AND_EMPTY: - if (!wait_until(next)) { + if (!wait_until(exec_ctx, next)) { return; } break; @@ -300,7 +295,7 @@ void grpc_timer_manager_init(void) { g_completed_threads = NULL; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; start_threads(); } @@ -347,7 +342,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); |