diff options
author | Craig Tiller <ctiller@google.com> | 2017-10-12 00:27:00 +0000 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-10-12 00:27:00 +0000 |
commit | 6b9a99c58d93cef4e538b9f33c68534e2a972505 (patch) | |
tree | 52b37c364df4d74b8fae769c559415ef4f900cce /src/core/lib/iomgr | |
parent | 513daab34b6761986237f81793be7627e1fcf77a (diff) | |
parent | 2ac511f44c000fd73951ddd268a4c86d76665e1b (diff) |
Merge github.com:grpc/grpc into epexinf
Diffstat (limited to 'src/core/lib/iomgr')
61 files changed, 442 insertions, 356 deletions
diff --git a/src/core/lib/iomgr/block_annotate.h b/src/core/lib/iomgr/block_annotate.h new file mode 100644 index 0000000000..fcbfe9eb1a --- /dev/null +++ b/src/core/lib/iomgr/block_annotate.h @@ -0,0 +1,64 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H +#define GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H + +#ifdef __cplusplus +extern "C" { +#endif + +void gpr_thd_start_blocking_region(); +void gpr_thd_end_blocking_region(); + +#ifdef __cplusplus +} +#endif + +/* These annotations identify the beginning and end of regions where + the code may block for reasons other than synchronization functions. + These include poll, epoll, and getaddrinfo. */ + +#ifdef GRPC_SCHEDULING_MARK_BLOCKING_REGION +#define GRPC_SCHEDULING_START_BLOCKING_REGION \ + do { \ + gpr_thd_start_blocking_region(); \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ + do { \ + gpr_thd_end_blocking_region(); \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(ec) \ + do { \ + gpr_thd_end_blocking_region(); \ + grpc_exec_ctx_invalidate_now((ec)); \ + } while (0) +#else +#define GRPC_SCHEDULING_START_BLOCKING_REGION \ + do { \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ + do { \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(ec) \ + do { \ + grpc_exec_ctx_invalidate_now((ec)); \ + } while (0) +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H */ diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 16ff0ab733..21347d9023 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -103,4 +103,4 @@ struct grpc_endpoint { } #endif -#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_H */ diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index f8830022f4..ee91795749 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -37,4 +37,4 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h index f718e06d4e..8746d5d353 100644 --- a/src/core/lib/iomgr/error_internal.h +++ b/src/core/lib/iomgr/error_internal.h @@ -65,4 +65,4 @@ bool grpc_error_is_special(grpc_error *err); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 689aac15bf..6126e2771c 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -24,6 +24,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <string.h> @@ -39,12 +40,12 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; @@ -561,25 +562,17 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_TIMER_END("pollset_shutdown", 0); } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, now) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) { + return INT_MAX; + } else if (delta < 0) { return 0; + } else { + return (int)delta; } - - static const gpr_timespec round_up = { - 0, /* tv_sec */ - GPR_NS_PER_MS - 1, /* tv_nsec */ - GPR_TIMESPAN /* clock_type */ - }; - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); - return millis >= 1 ? millis : 1; } /* Process the epoll events found by do_epoll_wait() function. @@ -636,11 +629,11 @@ static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx, (i.e the designated poller thread) will be calling this function. So there is no need for any synchronization when accesing fields in g_epoll_set */ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { GPR_TIMER_BEGIN("do_epoll_wait", 0); int r; - int timeout = poll_deadline_to_millis_timeout(deadline, now); + int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (timeout != 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; } @@ -650,7 +643,7 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -668,9 +661,10 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, return GRPC_ERROR_NONE; } -static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, gpr_timespec *now, - gpr_timespec deadline) { +static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + grpc_millis deadline) { GPR_TIMER_BEGIN("begin_worker", 0); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; @@ -755,14 +749,15 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->shutting_down); } - if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && + if (gpr_cv_wait(&worker->cv, &pollset->mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) && worker->state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ SET_KICK_STATE(worker, KICKED); } } - *now = gpr_now(now->clock_type); + grpc_exec_ctx_invalidate_now(exec_ctx); } if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -941,7 +936,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; @@ -952,7 +947,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, return GRPC_ERROR_NONE; } - if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) { + if (begin_worker(exec_ctx, ps, &worker, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!ps->shutting_down); @@ -975,8 +970,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, designated poller */ if (gpr_atm_acq_load(&g_epoll_set.cursor) == gpr_atm_acq_load(&g_epoll_set.num_events)) { - append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline), - err_desc); + append_error(&error, do_epoll_wait(exec_ctx, ps, deadline), err_desc); } append_error(&error, process_epoll_events(exec_ctx, ps), err_desc); diff --git a/src/core/lib/iomgr/ev_epoll1_linux.h b/src/core/lib/iomgr/ev_epoll1_linux.h index 66fd826b49..b437032b36 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.h +++ b/src/core/lib/iomgr/ev_epoll1_linux.h @@ -34,4 +34,4 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 93f9d2feff..a44cdb8597 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -25,6 +25,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <string.h> @@ -38,7 +39,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/is_epollexclusive_available.h" #include "src/core/lib/iomgr/lockfree_event.h" @@ -46,7 +47,6 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/spinlock.h" // debug aid: create workers on the heap (allows asan to spot @@ -651,32 +651,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &pollset->mu; } -/* Convert a timespec to milliseconds: - - Very small or negative poll times are clamped to zero to do a non-blocking - poll (which becomes spin polling) - - Other small values are rounded up to one millisecond - - Longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - Infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, now) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) + return INT_MAX; + else if (delta < 0) return 0; - } - - static const gpr_timespec round_up = { - 0, /* tv_sec */ - GPR_NS_PER_MS - 1, /* tv_nsec */ - GPR_TIMESPAN /* clock_type */ - }; - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); - return millis >= 1 ? millis : 1; + else + return (int)delta; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -781,9 +765,8 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable *p, gpr_timespec now, - gpr_timespec deadline) { - int timeout = poll_deadline_to_millis_timeout(deadline, now); + pollable *p, grpc_millis deadline) { + int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { char *desc = pollable_desc(p); @@ -800,7 +783,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -855,9 +838,10 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root_worker, } /* Return true if this thread should poll */ -static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, gpr_timespec *now, - gpr_timespec deadline) { +static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + grpc_millis deadline) { bool do_poll = true; if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; @@ -875,10 +859,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, worker->pollable_obj, worker, - poll_deadline_to_millis_timeout(deadline, *now)); + poll_deadline_to_millis_timeout(exec_ctx, deadline)); } while (do_poll && worker->pollable_obj->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, deadline)) { + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, worker->pollable_obj, worker); @@ -896,7 +881,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->pollable_obj, worker); } } - *now = gpr_now(now->clock_type); + grpc_exec_ctx_invalidate_now(exec_ctx); } gpr_mu_unlock(&worker->pollable_obj->mu); @@ -932,7 +917,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { #ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP grpc_pollset_worker *worker = (grpc_pollset_worker *)gpr_malloc(sizeof(*worker)); @@ -942,10 +927,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, #define WORKER_PTR (&worker) #endif if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 - ".%09d deadline=%" PRId64 ".%09d kwp=%d pollable=%p", - pollset, worker_hdl, WORKER_PTR, now.tv_sec, now.tv_nsec, - deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller, + gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR " kwp=%d pollable=%p", + pollset, worker_hdl, WORKER_PTR,grpc_exec_ctx_now(exec_ctx), + deadline, pollset->kicked_without_poller, pollset->active_pollable); } static const char *err_desc = "pollset_work"; @@ -953,7 +937,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->kicked_without_poller) { pollset->kicked_without_poller = false; } else { - if (begin_worker(pollset, WORKER_PTR, worker_hdl, &now, deadline)) { + if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); GPR_ASSERT(!pollset->shutdown_closure); @@ -961,7 +945,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->event_cursor == pollset->event_count) { append_error(&error, pollset_epoll(exec_ctx, pollset, WORKER_PTR->pollable_obj, - now, deadline), + deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), diff --git a/src/core/lib/iomgr/ev_epollex_linux.h b/src/core/lib/iomgr/ev_epollex_linux.h index 58cc5a24f8..2849a23283 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.h +++ b/src/core/lib/iomgr/ev_epollex_linux.h @@ -33,4 +33,4 @@ const grpc_event_engine_vtable *grpc_init_epollex_linux( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLEX_LINUX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLEX_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index c8e07c6e18..035bdc4cb5 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -18,6 +18,8 @@ #include "src/core/lib/iomgr/port.h" +#include <grpc/grpc_posix.h> + /* This polling engine is only relevant on linux kernels supporting epoll() */ #ifdef GRPC_LINUX_EPOLL @@ -25,6 +27,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <signal.h> @@ -40,13 +43,13 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -1089,30 +1092,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->shutdown_done = NULL; } -/* Convert a timespec to milliseconds: - - Very small or negative poll times are clamped to zero to do a non-blocking - poll (which becomes spin polling) - - Other small values are rounded up to one millisecond - - Longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - Infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) + return INT_MAX; + else if (delta < 0) return 0; - } - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); - return millis >= 1 ? millis : 1; + else + return (int)delta; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -1243,7 +1232,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); if (ep_rv < 0) { if (errno != EINTR) { gpr_asprintf(&err_msg, @@ -1310,10 +1299,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { GPR_TIMER_BEGIN("pollset_work", 0); grpc_error *error = GRPC_ERROR_NONE; - int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); + int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, deadline); sigset_t new_mask; diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index e170702dca..036a35690c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -24,6 +24,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <string.h> #include <sys/socket.h> @@ -37,12 +38,11 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/murmur_hash.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -50,7 +50,6 @@ /******************************************************************************* * FD declarations */ - typedef struct grpc_fd_watcher { struct grpc_fd_watcher *next; struct grpc_fd_watcher *prev; @@ -200,8 +199,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline); /* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 @@ -876,7 +875,7 @@ static void work_combine_error(grpc_error **composite, grpc_error *error) { static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; @@ -945,7 +944,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd_watcher *watchers; struct pollfd *pfds; - timeout = poll_deadline_to_millis_timeout(deadline, now); + timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (pollset->fd_count + 2 <= inline_elements) { pfds = pollfd_space; @@ -991,7 +990,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GRPC_SCHEDULING_START_BLOCKING_REGION; GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); @@ -1068,13 +1067,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + deadline = 0; } keep_polling = 1; } - if (keep_polling) { - now = gpr_now(now.clock_type); - } } gpr_tls_set(&g_current_thread_poller, 0); if (added_worker) { @@ -1126,21 +1122,14 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; + if (deadline == 0) return 0; + grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx); + if (n < 0) return 0; + if (n > INT_MAX) return -1; + return (int)n; } /******************************************************************************* diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h index 84b68155b5..861257204b 100644 --- a/src/core/lib/iomgr/ev_poll_posix.h +++ b/src/core/lib/iomgr/ev_poll_posix.h @@ -32,4 +32,4 @@ const grpc_event_engine_vtable *grpc_init_poll_cv_posix(bool explicit_request); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 80269881cb..5656ebb340 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -61,10 +61,28 @@ typedef struct { event_engine_factory_fn factory; } event_engine_factory; +namespace { +extern "C" { +int dummypoll(struct pollfd fds[], nfds_t nfds, int timeout) { + gpr_log(GPR_ERROR, "Attempted to poll despite declaring non-polling."); + GPR_ASSERT(false); + return -1; +} +} // extern "C" + +const grpc_event_engine_vtable *init_non_polling(bool explicit_request) { + // return the simplest engine as a dummy but also override the poller + auto ret = grpc_init_poll_posix(explicit_request); + grpc_poll_function = dummypoll; + return ret; +} +} // namespace + static const event_engine_factory g_factories[] = { {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux}, {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix}, {"poll-cv", grpc_init_poll_cv_posix}, + {"none", init_non_polling}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { @@ -203,9 +221,9 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); + grpc_pollset_worker **worker, + grpc_millis deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, deadline); } grpc_error *grpc_pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 5ad1c13ee6..bc4456c2a2 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -56,8 +56,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, + grpc_millis deadline); grpc_error *(*pollset_kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -166,4 +166,4 @@ const grpc_event_engine_vtable *grpc_get_event_engine_test_only(); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 41c69add17..3d17afcb8f 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -104,9 +104,69 @@ static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -void grpc_exec_ctx_global_init(void) {} +static gpr_timespec + g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the + // last enum value in + // gpr_clock_type + +void grpc_exec_ctx_global_init(void) { + for (int i = 0; i < GPR_TIMESPAN; i++) { + g_start_time[i] = gpr_now((gpr_clock_type)i); + } + // allows uniform treatment in conversion functions + g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN); +} + void grpc_exec_ctx_global_shutdown(void) {} +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); + double x = GPR_MS_PER_SEC * (double)ts.tv_sec + + (double)ts.tv_nsec / GPR_NS_PER_MS + + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx) { + if (!exec_ctx->now_is_valid) { + exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + exec_ctx->now_is_valid = true; + } + return exec_ctx->now; +} + +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) { + exec_ctx->now_is_valid = false; +} + +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, + gpr_clock_type clock_type) { + if (clock_type == GPR_TIMESPAN) { + return gpr_time_from_millis(millis, GPR_TIMESPAN); + } + return gpr_time_add(g_start_time[clock_type], + gpr_time_from_millis(millis, GPR_TIMESPAN)); +} + +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) { + return timespec_to_atm_round_down(ts); +} + +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { + return timespec_to_atm_round_up(ts); +} + static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { exec_ctx_run, exec_ctx_sched, "exec_ctx"}; static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index a93728f0a6..44b9be7aa9 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -19,14 +19,19 @@ #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H +#include <grpc/support/atm.h> #include <grpc/support/cpu.h> + #include "src/core/lib/iomgr/closure.h" #ifdef __cplusplus extern "C" { #endif -/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ +typedef gpr_atm grpc_millis; + +#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX +#define GRPC_MILLIS_INF_PAST GPR_ATM_MIN /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ @@ -70,6 +75,9 @@ struct grpc_exec_ctx { unsigned starting_cpu; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); + + bool now_is_valid; + grpc_millis now; }; /* initializer for grpc_exec_ctx: @@ -77,7 +85,7 @@ struct grpc_exec_ctx { #define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ { \ GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, gpr_cpu_current_cpu(), \ - finish_check_arg, finish_check \ + finish_check_arg, finish_check, false, 0 \ } /* initialize an execution context at the top level of an API call into grpc @@ -110,6 +118,12 @@ void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_shutdown(void); +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx); +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx); +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); + #ifdef __cplusplus } #endif diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index ebe7f240b4..92c3e70301 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -178,6 +178,7 @@ static void executor_thread(void *arg) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } + grpc_exec_ctx_invalidate_now(&exec_ctx); subtract_depth = run_closures(&exec_ctx, exec); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index ab3fc901de..ef5ac56c83 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -53,4 +53,4 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc index c082179c0b..78185cc084 100644 --- a/src/core/lib/iomgr/iocp_windows.cc +++ b/src/core/lib/iomgr/iocp_windows.cc @@ -21,11 +21,13 @@ #ifdef GRPC_WINSOCK_SOCKET #include <winsock2.h> +#include <limits> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/log_windows.h> #include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/iocp_windows.h" @@ -40,25 +42,20 @@ static gpr_atm g_custom_events = 0; static HANDLE g_iocp; -static DWORD deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { +static DWORD deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) { return INFINITE; } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return (DWORD)gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline < now) return 0; + grpc_millis timeout = deadline - now; + if (timeout > std::numeric_limits<DWORD>::max()) return INFINITE; + return static_cast<DWORD>(deadline - now); } grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, - gpr_timespec deadline) { + grpc_millis deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -67,9 +64,10 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket; grpc_winsocket_callback_info *info; GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); - success = GetQueuedCompletionStatus( - g_iocp, &bytes, &completion_key, &overlapped, - deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); + success = + GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped, + deadline_to_millis_timeout(exec_ctx, deadline)); + grpc_exec_ctx_invalidate_now(exec_ctx); if (success == 0 && overlapped == NULL) { return GRPC_IOCP_WORK_TIMEOUT; } @@ -121,7 +119,7 @@ void grpc_iocp_flush(void) { grpc_iocp_work_status work_status; do { - work_status = grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); + work_status = grpc_iocp_work(&exec_ctx, GRPC_MILLIS_INF_PAST); } while (work_status == GRPC_IOCP_WORK_KICK || grpc_exec_ctx_flush(&exec_ctx)); } @@ -129,7 +127,7 @@ void grpc_iocp_flush(void) { void grpc_iocp_shutdown(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (gpr_atm_acq_load(&g_custom_events)) { - grpc_iocp_work(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_iocp_work(&exec_ctx, GRPC_MILLIS_INF_FUTURE); grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/iocp_windows.h b/src/core/lib/iomgr/iocp_windows.h index 341c159501..4efbc94645 100644 --- a/src/core/lib/iomgr/iocp_windows.h +++ b/src/core/lib/iomgr/iocp_windows.h @@ -34,7 +34,7 @@ typedef enum { } grpc_iocp_work_status; grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, - gpr_timespec deadline); + grpc_millis deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_flush(void); @@ -45,4 +45,4 @@ void grpc_iocp_add_socket(grpc_winsocket *); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */ diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 3a0605833a..d6a5b4a76c 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -51,7 +51,7 @@ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); grpc_executor_init(exec_ctx); - grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_list_init(exec_ctx); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = (char *)"root"; grpc_network_status_init(); @@ -98,8 +98,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == - GRPC_TIMERS_FIRED) { + exec_ctx->now_is_valid = true; + exec_ctx->now = GRPC_MILLIS_INF_FUTURE; + if (grpc_timer_check(exec_ctx, NULL) == GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index fea08496fe..6c0a08b918 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -40,4 +40,4 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */ diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index 005abbed13..52db37c89a 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -48,4 +48,4 @@ bool grpc_iomgr_abort_on_leaks(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/is_epollexclusive_available.h b/src/core/lib/iomgr/is_epollexclusive_available.h index 5c3e483065..9ae9c5c191 100644 --- a/src/core/lib/iomgr/is_epollexclusive_available.h +++ b/src/core/lib/iomgr/is_epollexclusive_available.h @@ -31,4 +31,4 @@ bool grpc_is_epollexclusive_available(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_IS_EPOLLEXCLUSIVE_AVAILABLE_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_IS_EPOLLEXCLUSIVE_AVAILABLE_H */ diff --git a/src/core/lib/iomgr/load_file.cc b/src/core/lib/iomgr/load_file.cc index 0b4d41ea4b..5cb4099ea4 100644 --- a/src/core/lib/iomgr/load_file.cc +++ b/src/core/lib/iomgr/load_file.cc @@ -25,7 +25,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/support/string.h" grpc_error *grpc_load_file(const char *filename, int add_null_terminator, @@ -73,6 +73,6 @@ end: GRPC_ERROR_UNREF(error); error = error_out; } - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; return error; } diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h index 925f004945..02229e569e 100644 --- a/src/core/lib/iomgr/lockfree_event.h +++ b/src/core/lib/iomgr/lockfree_event.h @@ -45,4 +45,4 @@ void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */ diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h index af50d51257..cba38d4530 100644 --- a/src/core/lib/iomgr/network_status_tracker.h +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -35,4 +35,4 @@ void grpc_network_status_shutdown_all_endpoints(); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h index 4a37acf212..009f968fac 100644 --- a/src/core/lib/iomgr/polling_entity.h +++ b/src/core/lib/iomgr/polling_entity.h @@ -72,4 +72,4 @@ void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */ diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index 28d63949ea..799fae154c 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -75,8 +75,8 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); pollset lock */ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) GRPC_MUST_USE_RESULT; + grpc_pollset_worker **worker, + grpc_millis deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is non-NULL, then kick that worker. */ diff --git a/src/core/lib/iomgr/pollset_set.h b/src/core/lib/iomgr/pollset_set.h index 17df86542d..5455eda02f 100644 --- a/src/core/lib/iomgr/pollset_set.h +++ b/src/core/lib/iomgr/pollset_set.h @@ -52,4 +52,4 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_H */ diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc index 7ea5019ad5..b9901bf8ef 100644 --- a/src/core/lib/iomgr/pollset_uv.cc +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -116,13 +116,14 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { uint64_t timeout; GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { - if (gpr_time_cmp(deadline, now) >= 0) { - timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline >= now) { + timeout = deadline - now; } else { timeout = 0; } diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h index d8f72ff867..5cc9faf4ff 100644 --- a/src/core/lib/iomgr/pollset_uv.h +++ b/src/core/lib/iomgr/pollset_uv.h @@ -32,4 +32,4 @@ void grpc_pollset_global_shutdown(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */ diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc index eb295d3eeb..bb4df83fc1 100644 --- a/src/core/lib/iomgr/pollset_windows.cc +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -110,7 +110,7 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {} grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; @@ -159,7 +159,8 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, &worker); added_worker = 1; while (!worker.kicked) { - if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, deadline)) { + if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { break; } } diff --git a/src/core/lib/iomgr/pollset_windows.h b/src/core/lib/iomgr/pollset_windows.h index 7733d26471..2479b25286 100644 --- a/src/core/lib/iomgr/pollset_windows.h +++ b/src/core/lib/iomgr/pollset_windows.h @@ -68,4 +68,4 @@ void grpc_pollset_global_shutdown(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index 4a6df2cf26..5f0634299e 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -60,4 +60,4 @@ extern grpc_error *(*grpc_blocking_resolve_address)( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */ diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 60cfeebd47..1b783495df 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -33,10 +33,10 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" static grpc_error *blocking_resolve_address_impl( @@ -81,7 +81,7 @@ static grpc_error *blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (s != 0) { /* Retry if well-known service name is recognized */ @@ -90,7 +90,7 @@ static grpc_error *blocking_resolve_address_impl( if (strcmp(port, svc[i][0]) == 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, svc[i][1], &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; break; } } diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index abcfc2114d..451f01a701 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -34,10 +34,10 @@ #include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" typedef struct { @@ -87,7 +87,7 @@ static grpc_error *blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (s != 0) { error = GRPC_WSA_ERROR(WSAGetLastError(), "getaddrinfo"); goto done; diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 60262435b3..ecb5747da8 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -89,6 +89,8 @@ struct grpc_resource_user { grpc_closure_list on_allocated; /* True if we are currently trying to allocate from the quota, false if not */ bool allocating; + /* How many bytes of allocations are outstanding */ + int64_t outstanding_allocations; /* True if we are currently trying to add ourselves to the non-free quota list, false otherwise */ bool added_to_free_pool; @@ -153,6 +155,9 @@ struct grpc_resource_quota { char *name; }; +static void ru_unref_by(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, gpr_atm amount); + /******************************************************************************* * list management */ @@ -289,6 +294,25 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) { gpr_mu_lock(&resource_user->mu); + if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + gpr_log(GPR_DEBUG, "RQ: check allocation for user %p shutdown=%" PRIdPTR + " free_pool=%" PRId64, + resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown), + resource_user->free_pool); + } + if (gpr_atm_no_barrier_load(&resource_user->shutdown)) { + resource_user->allocating = false; + grpc_closure_list_fail_all( + &resource_user->on_allocated, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown")); + int64_t aborted_allocations = resource_user->outstanding_allocations; + resource_user->outstanding_allocations = 0; + resource_user->free_pool += aborted_allocations; + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); + gpr_mu_unlock(&resource_user->mu); + ru_unref_by(exec_ctx, resource_user, (gpr_atm)aborted_allocations); + continue; + } if (resource_user->free_pool < 0 && -resource_user->free_pool <= resource_quota->free_pool) { int64_t amt = -resource_user->free_pool; @@ -308,6 +332,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, } if (resource_user->free_pool >= 0) { resource_user->allocating = false; + resource_user->outstanding_allocations = 0; GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); } else { @@ -488,6 +513,9 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, } static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { + if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + gpr_log(GPR_DEBUG, "RU shutdown %p", ru); + } grpc_resource_user *resource_user = (grpc_resource_user *)ru; GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); @@ -497,6 +525,9 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { resource_user->reclaimers[1] = NULL; rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); + if (resource_user->allocating) { + rq_step_sched(exec_ctx, resource_user->resource_quota); + } } static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { @@ -718,6 +749,7 @@ grpc_resource_user *grpc_resource_user_create( resource_user->reclaimers[1] = NULL; resource_user->new_reclaimers[0] = NULL; resource_user->new_reclaimers[1] = NULL; + resource_user->outstanding_allocations = 0; for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_user->links[i].next = resource_user->links[i].prev = NULL; } @@ -778,6 +810,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&resource_user->mu); ru_ref_by(resource_user, (gpr_atm)size); resource_user->free_pool -= (int64_t)size; + resource_user->outstanding_allocations += (int64_t)size; if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, @@ -792,6 +825,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); } } else { + resource_user->outstanding_allocations -= (int64_t)size; GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index 3afb525434..1d4249b7e2 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -158,4 +158,4 @@ grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 129bb54fc9..1fd552febb 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -85,4 +85,4 @@ int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h index f319e931b6..d6c538ec6f 100644 --- a/src/core/lib/iomgr/socket_utils.h +++ b/src/core/lib/iomgr/socket_utils.h @@ -32,4 +32,4 @@ const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */ diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 623b83f08b..73809b68d3 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -137,4 +137,4 @@ grpc_error *grpc_create_dualstack_socket_using_factory( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h index a00a7615a3..84fa071e89 100644 --- a/src/core/lib/iomgr/socket_windows.h +++ b/src/core/lib/iomgr/socket_windows.h @@ -115,4 +115,4 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 18cf6114f2..b2f365f2af 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -39,10 +39,10 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline); + grpc_millis deadline); #ifdef __cplusplus } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 7d9e9533fd..5611dd9062 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -48,7 +48,6 @@ extern grpc_tracer_flag grpc_tcp_trace; typedef struct { gpr_mu mu; grpc_fd *fd; - gpr_timespec deadline; grpc_timer alarm; grpc_closure on_alarm; int refs; @@ -244,7 +243,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -325,9 +324,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&ac->mu); GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); @@ -342,7 +339,7 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; } void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, @@ -350,7 +347,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h index 0b9775504c..8740511804 100644 --- a/src/core/lib/iomgr/tcp_client_posix.h +++ b/src/core/lib/iomgr/tcp_client_posix.h @@ -35,4 +35,4 @@ grpc_endpoint *grpc_tcp_client_create_from_fd( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_client_uv.cc b/src/core/lib/iomgr/tcp_client_uv.cc index 83835978f4..f3e9366299 100644 --- a/src/core/lib/iomgr/tcp_client_uv.cc +++ b/src/core/lib/iomgr/tcp_client_uv.cc @@ -119,7 +119,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *resolved_addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_uv_tcp_connect *connect; grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); (void)channel_args; @@ -158,9 +158,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, uv_tc_on_connect); GRPC_CLOSURE_INIT(&connect->on_alarm, uv_tc_on_alarm, connect, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &connect->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &connect->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &connect->alarm, deadline, &connect->on_alarm); } // overridden by api_fuzzer.c @@ -169,7 +167,7 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; } void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, @@ -177,7 +175,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc index 1154965c82..9adf7ee4e9 100644 --- a/src/core/lib/iomgr/tcp_client_windows.cc +++ b/src/core/lib/iomgr/tcp_client_windows.cc @@ -43,7 +43,6 @@ typedef struct { grpc_closure *on_done; gpr_mu mu; grpc_winsocket *socket; - gpr_timespec deadline; grpc_timer alarm; grpc_closure on_alarm; char *addr_name; @@ -126,7 +125,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { static void tcp_client_connect_impl( grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, - const grpc_resolved_address *addr, gpr_timespec deadline) { + const grpc_resolved_address *addr, grpc_millis deadline) { SOCKET sock = INVALID_SOCKET; BOOL success; int status; @@ -206,8 +205,7 @@ static void tcp_client_connect_impl( GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm, - gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect); return; @@ -233,7 +231,7 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; } void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, @@ -241,7 +239,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 4489896ffb..dbcc976ae9 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -135,13 +135,11 @@ static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN)); + grpc_millis deadline = grpc_exec_ctx_now(exec_ctx) + 13 * GPR_MS_PER_SEC; GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx); - GRPC_LOG_IF_ERROR("backup_poller:pollset_work", - grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, - now, deadline)); + GRPC_LOG_IF_ERROR( + "backup_poller:pollset_work", + grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, deadline)); gpr_mu_unlock(p->pollset_mu); /* last "uncovered" notification is the ref that keeps us polling, if we get * there try a cas to release it */ diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index dda78b2f8e..47e78fa67e 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -61,4 +61,4 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 3f190ac285..8f9ce3819e 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -106,4 +106,4 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 4bb0660f09..6746333960 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -125,4 +125,4 @@ bool grpc_tcp_server_have_ifaddrs(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h index ba7db8a0f7..3399535b42 100644 --- a/src/core/lib/iomgr/tcp_uv.h +++ b/src/core/lib/iomgr/tcp_uv.h @@ -50,4 +50,4 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ diff --git a/src/core/lib/iomgr/time_averaged_stats.h b/src/core/lib/iomgr/time_averaged_stats.h index e255b58fee..d38ed272b6 100644 --- a/src/core/lib/iomgr/time_averaged_stats.h +++ b/src/core/lib/iomgr/time_averaged_stats.h @@ -78,4 +78,4 @@ double grpc_time_averaged_stats_update_average(grpc_time_averaged_stats* stats); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H */ diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index 466600d582..419e834cf1 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -45,8 +45,7 @@ typedef struct grpc_timer grpc_timer; application callback is also responsible for maintaining information about when to free up any user-level state. */ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now); + grpc_millis deadline, grpc_closure *closure); /* Initialize *timer without setting it. This can later be passed through the regular init or cancel */ @@ -96,8 +95,8 @@ typedef enum { with high probability at least one thread in the system will see an update at any time slice. */ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next); -void grpc_timer_list_init(gpr_timespec now); + grpc_millis *next); +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); /* Consume a kick issued by grpc_kick_poller */ diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 971d80d8bc..b8e895de6f 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -220,9 +220,6 @@ struct shared_mutables { static struct shared_mutables g_shared_mutables; -static gpr_clock_type g_clock_type; -static gpr_timespec g_start_time; - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -235,52 +232,19 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm *next, grpc_error *error); -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - -static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = GPR_MS_PER_SEC * (double)ts.tv_sec + - (double)ts.tv_nsec / GPR_NS_PER_MS + - (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = - GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_timespec atm_to_timespec(gpr_atm x) { - return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); -} - static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } -void grpc_timer_list_init(gpr_timespec now) { +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { uint32_t i; g_shared_mutables.initialized = true; g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; gpr_mu_init(&g_shared_mutables.mu); - g_clock_type = now.clock_type; - g_start_time = now; - g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); grpc_register_tracer(&grpc_timer_trace); @@ -317,10 +281,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { g_shared_mutables.initialized = false; } -static double ts_to_dbl(gpr_timespec ts) { - return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; -} - /* returns true if the first element in the list */ static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; @@ -361,24 +321,20 @@ static void note_deadline_change(timer_shard *shard) { void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { int is_first_timer = 0; timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; - GPR_ASSERT(deadline.clock_type == g_clock_type); - GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline); + timer->deadline = deadline; #ifndef NDEBUG timer->hash_table_next = NULL; #endif if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR - "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", - timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec, - now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb); + gpr_log(GPR_DEBUG, + "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, + deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb); } if (!g_shared_mutables.initialized) { @@ -391,7 +347,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_mu_lock(&shard->mu); timer->pending = true; - if (gpr_time_cmp(deadline, now) <= 0) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline <= now) { timer->pending = false; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); gpr_mu_unlock(&shard->mu); @@ -400,11 +357,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } grpc_time_averaged_stats_add_sample(&shard->stats, - ts_to_dbl(gpr_time_sub(deadline, now))); + (double)(deadline - now) / 1000.0); ADD_TO_HASH_TABLE(timer); - if (deadline_atm < shard->queue_deadline_cap) { + if (deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; @@ -435,12 +392,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, shard->min_deadline); } - if (deadline_atm < shard->min_deadline) { + if (deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline_atm; + shard->min_deadline = deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && deadline_atm < old_min_deadline) { - gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline_atm); + if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline); grpc_kick_poller(); } } @@ -544,8 +501,9 @@ static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { } if (timer->deadline > now) return NULL; if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer, - now - timer->deadline); + gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler", + timer, now - timer->deadline, + timer->closure->scheduler->vtable->name); } timer->pending = false; grpc_timer_heap_pop(&shard->heap); @@ -567,6 +525,10 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, } *new_min_deadline = compute_min_deadline(shard); gpr_mu_unlock(&shard->mu); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR, + (int)(shard - g_shards), n); + } return n; } @@ -639,29 +601,27 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { // prelude - GPR_ASSERT(now.clock_type == g_clock_type); - gpr_atm now_atm = timespec_to_atm_round_down(now); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ - gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); - if (now_atm < min_timer) { + grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now < min_timer) { if (next != NULL) { - *next = - atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); + *next = GPR_MIN(*next, min_timer); } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, - "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, - now_atm, min_timer); + "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, + min_timer); } return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 + now != GRPC_MILLIS_INF_FUTURE ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); @@ -671,34 +631,24 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, timespec_to_atm_round_down(*next)); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } - gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR - "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, - now.tv_sec, now.tv_nsec, now_atm, next_str, - gpr_tls_get(&g_last_seen_min_timer), + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR + " next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + now, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); } // actual code - grpc_timer_check_result r; - gpr_atm next_atm; - if (next == NULL) { - r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); - } else { - next_atm = timespec_to_atm_round_down(*next); - r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); - *next = atm_to_timespec(next_atm); - } + grpc_timer_check_result r = + run_some_expired_timers(exec_ctx, now, next, shutdown_error); // tracing if (GRPC_TRACER_ON(grpc_timer_check_trace)) { char *next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, next_atm); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); diff --git a/src/core/lib/iomgr/timer_heap.h b/src/core/lib/iomgr/timer_heap.h index f15e8a3abb..228d038ab3 100644 --- a/src/core/lib/iomgr/timer_heap.h +++ b/src/core/lib/iomgr/timer_heap.h @@ -47,4 +47,4 @@ int grpc_timer_heap_is_empty(grpc_timer_heap *heap); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TIMER_HEAP_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TIMER_HEAP_H */ diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 9b54fab898..1248f82189 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -55,7 +55,7 @@ static bool g_kicked; static bool g_has_timed_waiter; // the deadline of the current timed waiter thread (only relevant if // g_has_timed_waiter is true) -static gpr_timespec g_timed_waiter_deadline; +static grpc_millis g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -99,9 +99,8 @@ static void start_timer_thread_and_unlock(void) { void grpc_timer_manager_tick() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_timer_check(&exec_ctx, now, &next); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_timer_check(&exec_ctx, &next); grpc_exec_ctx_finish(&exec_ctx); } @@ -124,6 +123,9 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { gpr_mu_unlock(&g_mu); } // without our lock, flush the exec_ctx + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "flush exec_ctx"); + } grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&g_mu); // garbage collect any threads hanging out that are dead @@ -136,8 +138,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { // wait until 'next' (or forever if there is already a timed waiter in the pool) // returns true if the thread should continue executing (false if it should // shutdown) -static bool wait_until(gpr_timespec next) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); +static bool wait_until(grpc_exec_ctx *exec_ctx, grpc_millis next) { gpr_mu_lock(&g_mu); // if we're not threaded anymore, leave if (!g_threaded) { @@ -171,30 +172,29 @@ static bool wait_until(gpr_timespec next) { unless their 'next' is earlier than the current timed-waiter's deadline (in which case the thread with earlier 'next' takes over as the new timed waiter) */ - if (gpr_time_cmp(next, inf_future) != 0) { - if (!g_has_timed_waiter || - (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + if (next != GRPC_MILLIS_INF_FUTURE) { + if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) { my_timed_waiter_generation = ++g_timed_waiter_generation; g_has_timed_waiter = true; g_timed_waiter_deadline = next; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = - gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx); + gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", + wait_time); } } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; } } if (GRPC_TRACER_ON(grpc_timer_check_trace) && - gpr_time_cmp(next, inf_future) == 0) { + next == GRPC_MILLIS_INF_FUTURE) { gpr_log(GPR_DEBUG, "sleep until kicked"); } - gpr_cv_wait(&g_cv_wait, &g_mu, next); + gpr_cv_wait(&g_cv_wait, &g_mu, + grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME)); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", @@ -206,7 +206,7 @@ static bool wait_until(gpr_timespec next) { // there's work to do after checking timers (code above) if (my_timed_waiter_generation == g_timed_waiter_generation) { g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; } } @@ -222,12 +222,11 @@ static bool wait_until(gpr_timespec next) { } static void timer_main_loop(grpc_exec_ctx *exec_ctx) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { - gpr_timespec next = inf_future; - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_exec_ctx_invalidate_now(exec_ctx); // check timer state, updates next to the next time to run a check - switch (grpc_timer_check(exec_ctx, now, &next)) { + switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: run_some_timers(exec_ctx); break; @@ -244,10 +243,10 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; /* fall through */ case GRPC_TIMERS_CHECKED_AND_EMPTY: - if (!wait_until(next)) { + if (!wait_until(exec_ctx, next)) { return; } break; @@ -303,7 +302,7 @@ void grpc_timer_manager_init(void) { g_completed_threads = NULL; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; start_threads(); } @@ -350,7 +349,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); diff --git a/src/core/lib/iomgr/timer_manager.h b/src/core/lib/iomgr/timer_manager.h index d8a59a9477..72960d6ffc 100644 --- a/src/core/lib/iomgr/timer_manager.h +++ b/src/core/lib/iomgr/timer_manager.h @@ -42,4 +42,4 @@ void grpc_timer_manager_tick(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TIMER_MANAGER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TIMER_MANAGER_H */ diff --git a/src/core/lib/iomgr/timer_uv.cc b/src/core/lib/iomgr/timer_uv.cc index 53f79b545a..ccbbe357ae 100644 --- a/src/core/lib/iomgr/timer_uv.cc +++ b/src/core/lib/iomgr/timer_uv.cc @@ -55,19 +55,18 @@ void run_expired_timer(uv_timer_t *handle) { } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { uint64_t timeout; uv_timer_t *uv_timer; GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; - if (gpr_time_cmp(deadline, now) <= 0) { + if (deadline <= grpc_exec_ctx_now(exec_ctx)) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } timer->pending = 1; - timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); + timeout = (uint64_t)(deadline - grpc_exec_ctx_now(exec_ctx)); uv_timer = (uv_timer_t *)gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); uv_timer->data = timer; @@ -91,11 +90,11 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { return GRPC_TIMERS_NOT_CHECKED; } -void grpc_timer_list_init(gpr_timespec now) {} +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {} void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {} void grpc_timer_consume_kick(void) {} diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index bcd8572260..e887cb1bcf 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -81,4 +81,4 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */ diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h index b96131ae1c..3e7f9c7d1e 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.h +++ b/src/core/lib/iomgr/unix_sockets_posix.h @@ -46,4 +46,4 @@ char *grpc_sockaddr_to_uri_unix_if_possible( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */ |