diff options
Diffstat (limited to 'src/core/lib')
53 files changed, 456 insertions, 517 deletions
diff --git a/src/core/lib/support/backoff.c b/src/core/lib/backoff/backoff.c index 6dc0df473b..e964af8f39 100644 --- a/src/core/lib/support/backoff.c +++ b/src/core/lib/backoff/backoff.c @@ -16,13 +16,14 @@ * */ -#include "src/core/lib/support/backoff.h" +#include "src/core/lib/backoff/backoff.h" #include <grpc/support/useful.h> -void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout, - double multiplier, double jitter, - int64_t min_timeout_millis, int64_t max_timeout_millis) { +void grpc_backoff_init(grpc_backoff *backoff, + grpc_millis initial_connect_timeout, double multiplier, + double jitter, grpc_millis min_timeout_millis, + grpc_millis max_timeout_millis) { backoff->initial_connect_timeout = initial_connect_timeout; backoff->multiplier = multiplier; backoff->jitter = jitter; @@ -31,11 +32,11 @@ void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout, backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; } -gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now) { +grpc_millis grpc_backoff_begin(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff) { backoff->current_timeout_millis = backoff->initial_connect_timeout; const int64_t first_timeout = GPR_MAX(backoff->current_timeout_millis, backoff->min_timeout_millis); - return gpr_time_add(now, gpr_time_from_millis(first_timeout, GPR_TIMESPAN)); + return grpc_exec_ctx_now(exec_ctx) + first_timeout; } /* Generate a random number between 0 and 1. */ @@ -44,7 +45,7 @@ static double generate_uniform_random_number(uint32_t *rng_state) { return *rng_state / (double)((uint32_t)1 << 31); } -gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) { +grpc_millis grpc_backoff_step(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff) { const double new_timeout_millis = backoff->multiplier * (double)backoff->current_timeout_millis; backoff->current_timeout_millis = @@ -58,15 +59,15 @@ gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) { backoff->current_timeout_millis = (int64_t)((double)(backoff->current_timeout_millis) + jitter); - const gpr_timespec current_deadline = gpr_time_add( - now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); + const grpc_millis current_deadline = + grpc_exec_ctx_now(exec_ctx) + backoff->current_timeout_millis; - const gpr_timespec min_deadline = gpr_time_add( - now, gpr_time_from_millis(backoff->min_timeout_millis, GPR_TIMESPAN)); + const grpc_millis min_deadline = + grpc_exec_ctx_now(exec_ctx) + backoff->min_timeout_millis; - return gpr_time_max(current_deadline, min_deadline); + return GPR_MAX(current_deadline, min_deadline); } -void gpr_backoff_reset(gpr_backoff *backoff) { +void grpc_backoff_reset(grpc_backoff *backoff) { backoff->current_timeout_millis = backoff->initial_connect_timeout; } diff --git a/src/core/lib/support/backoff.h b/src/core/lib/backoff/backoff.h index 6e0cc3a4b6..c0798bbe5b 100644 --- a/src/core/lib/support/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -16,41 +16,43 @@ * */ -#ifndef GRPC_CORE_LIB_SUPPORT_BACKOFF_H -#define GRPC_CORE_LIB_SUPPORT_BACKOFF_H +#ifndef GRPC_CORE_LIB_BACKOFF_BACKOFF_H +#define GRPC_CORE_LIB_BACKOFF_BACKOFF_H -#include <grpc/support/time.h> +#include "src/core/lib/iomgr/exec_ctx.h" typedef struct { /// const: how long to wait after the first failure before retrying - int64_t initial_connect_timeout; + grpc_millis initial_connect_timeout; /// const: factor with which to multiply backoff after a failed retry double multiplier; /// const: amount to randomize backoffs double jitter; /// const: minimum time between retries in milliseconds - int64_t min_timeout_millis; + grpc_millis min_timeout_millis; /// const: maximum time between retries in milliseconds - int64_t max_timeout_millis; + grpc_millis max_timeout_millis; /// random number generator uint32_t rng_state; /// current retry timeout in milliseconds int64_t current_timeout_millis; -} gpr_backoff; +} grpc_backoff; /// Initialize backoff machinery - does not need to be destroyed -void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout, - double multiplier, double jitter, - int64_t min_timeout_millis, int64_t max_timeout_millis); +void grpc_backoff_init(grpc_backoff *backoff, + grpc_millis initial_connect_timeout, double multiplier, + double jitter, grpc_millis min_timeout_millis, + grpc_millis max_timeout_millis); /// Begin retry loop: returns a timespec for the NEXT retry -gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now); +grpc_millis grpc_backoff_begin(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff); /// Step a retry loop: returns a timespec for the NEXT retry -gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now); -/// Reset the backoff, so the next gpr_backoff_step will be a gpr_backoff_begin +grpc_millis grpc_backoff_step(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff); +/// Reset the backoff, so the next grpc_backoff_step will be a +/// grpc_backoff_begin /// instead -void gpr_backoff_reset(gpr_backoff *backoff); +void grpc_backoff_reset(grpc_backoff *backoff); -#endif /* GRPC_CORE_LIB_SUPPORT_BACKOFF_H */ +#endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */ diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index ae1cac31f7..e616a51024 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -70,7 +70,7 @@ typedef struct { grpc_call_context_element *context; grpc_slice path; gpr_timespec start_time; - gpr_timespec deadline; + grpc_millis deadline; gpr_arena *arena; grpc_call_combiner *call_combiner; } grpc_call_element_args; diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 1753da5721..b27ee37e5b 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -232,7 +232,7 @@ static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* channel_args, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, void* user_data) { gpr_mu_lock(&mgr->mu); GPR_ASSERT(mgr->index == 0); @@ -255,9 +255,7 @@ void grpc_handshake_manager_do_handshake( gpr_ref(&mgr->refs); GRPC_CLOSURE_INIT(&mgr->on_timeout, on_timeout, mgr, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &mgr->deadline_timer, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &mgr->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &mgr->deadline_timer, deadline, &mgr->on_timeout); // Start first handshaker, which also owns a ref. gpr_ref(&mgr->refs); bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE); diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index eb9a59bd08..09b4a27c1c 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -145,7 +145,7 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* channel_args, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, void* user_data); /// Add \a mgr to the server side list of all pending handshake managers, the diff --git a/src/core/lib/compression/stream_compression.h b/src/core/lib/compression/stream_compression.h index 844dff81a3..0daaa9e655 100644 --- a/src/core/lib/compression/stream_compression.h +++ b/src/core/lib/compression/stream_compression.h @@ -87,4 +87,4 @@ grpc_stream_compression_context *grpc_stream_compression_context_create( void grpc_stream_compression_context_destroy( grpc_stream_compression_context *ctx); -#endif +#endif /* GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_H */ diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 84cc39604c..7d768e9556 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -44,7 +44,7 @@ typedef struct { grpc_endpoint *ep; char *host; char *ssl_host_override; - gpr_timespec deadline; + grpc_millis deadline; int have_read_byte; const grpc_httpcli_handshaker *handshaker; grpc_closure *on_done; @@ -65,7 +65,7 @@ static grpc_httpcli_post_override g_post_override = NULL; static void plaintext_handshake(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint, const char *host, - gpr_timespec deadline, + grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)) { @@ -240,7 +240,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_done, + grpc_millis deadline, grpc_closure *on_done, grpc_httpcli_response *response, const char *name, grpc_slice request_text) { internal_request *req = @@ -278,9 +278,8 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, - const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_done, - grpc_httpcli_response *response) { + const grpc_httpcli_request *request, grpc_millis deadline, + grpc_closure *on_done, grpc_httpcli_response *response) { char *name; if (g_get_override && g_get_override(exec_ctx, request, deadline, on_done, response)) { @@ -298,7 +297,7 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, - gpr_timespec deadline, grpc_closure *on_done, + grpc_millis deadline, grpc_closure *on_done, grpc_httpcli_response *response) { char *name; if (g_post_override && diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 809618695e..a3baf512a3 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -42,7 +42,7 @@ typedef struct grpc_httpcli_context { typedef struct { const char *default_port; void (*handshake)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint, - const char *host, gpr_timespec deadline, + const char *host, grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)); } grpc_httpcli_handshaker; @@ -83,8 +83,8 @@ void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx, void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, - const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_complete, + const grpc_httpcli_request *request, grpc_millis deadline, + grpc_closure *on_complete, grpc_httpcli_response *response); /* Asynchronously perform a HTTP POST. @@ -106,18 +106,18 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, - gpr_timespec deadline, grpc_closure *on_complete, + grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); /* override functions return 1 if they handled the request, 0 otherwise */ typedef int (*grpc_httpcli_get_override)(grpc_exec_ctx *exec_ctx, const grpc_httpcli_request *request, - gpr_timespec deadline, + grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); typedef int (*grpc_httpcli_post_override)( grpc_exec_ctx *exec_ctx, const grpc_httpcli_request *request, - const char *body_bytes, size_t body_size, gpr_timespec deadline, + const char *body_bytes, size_t body_size, grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); void grpc_httpcli_set_override(grpc_httpcli_get_override get, diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 97c2886525..37ee08783e 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -155,7 +155,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, const char *host, - gpr_timespec deadline, + grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)) { on_done_closure *c = gpr_malloc(sizeof(*c)); diff --git a/src/core/lib/support/block_annotate.h b/src/core/lib/iomgr/block_annotate.h index 8e3ef7df65..cbcb5d92f0 100644 --- a/src/core/lib/support/block_annotate.h +++ b/src/core/lib/iomgr/block_annotate.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H -#define GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H +#ifndef GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H +#define GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H #ifdef __cplusplus extern "C" { @@ -47,9 +47,13 @@ void gpr_thd_end_blocking_region(); #define GRPC_SCHEDULING_START_BLOCKING_REGION \ do { \ } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION \ - do { \ +#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ + do { \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(ec) \ + do { \ + grpc_exec_ctx_invalidate_now((ec)); \ } while (0) #endif -#endif /* GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H */ +#endif /* GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 6946a2cbf5..7fa2899092 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -25,6 +25,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <string.h> @@ -40,12 +41,12 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; @@ -555,22 +556,17 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_TIMER_END("pollset_shutdown", 0); } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, now) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) { + return INT_MAX; + } else if (delta < 0) { return 0; + } else { + return (int)delta; } - - static const gpr_timespec round_up = { - .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1}; - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); - return millis >= 1 ? millis : 1; } /* Process the epoll events found by do_epoll_wait() function. @@ -627,11 +623,11 @@ static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx, (i.e the designated poller thread) will be calling this function. So there is no need for any synchronization when accesing fields in g_epoll_set */ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { GPR_TIMER_BEGIN("do_epoll_wait", 0); int r; - int timeout = poll_deadline_to_millis_timeout(deadline, now); + int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (timeout != 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; } @@ -641,7 +637,7 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -657,9 +653,10 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, return GRPC_ERROR_NONE; } -static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, gpr_timespec *now, - gpr_timespec deadline) { +static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + grpc_millis deadline) { GPR_TIMER_BEGIN("begin_worker", 0); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; @@ -744,14 +741,15 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->shutting_down); } - if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && + if (gpr_cv_wait(&worker->cv, &pollset->mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) && worker->kick_state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ SET_KICK_STATE(worker, KICKED); } } - *now = gpr_now(now->clock_type); + grpc_exec_ctx_invalidate_now(exec_ctx); } if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -926,7 +924,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; @@ -937,7 +935,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, return GRPC_ERROR_NONE; } - if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) { + if (begin_worker(exec_ctx, ps, &worker, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!ps->shutting_down); @@ -960,8 +958,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, designated poller */ if (gpr_atm_acq_load(&g_epoll_set.cursor) == gpr_atm_acq_load(&g_epoll_set.num_events)) { - append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline), - err_desc); + append_error(&error, do_epoll_wait(exec_ctx, ps, deadline), err_desc); } append_error(&error, process_epoll_events(exec_ctx, ps), err_desc); diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index df69025f1a..91164de13e 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -25,6 +25,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <string.h> @@ -38,7 +39,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/is_epollexclusive_available.h" #include "src/core/lib/iomgr/lockfree_event.h" @@ -46,20 +47,18 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/spinlock.h" /******************************************************************************* * Polling object */ - typedef enum { PO_POLLING_GROUP, PO_POLLSET_SET, PO_POLLSET, - PO_FD, /* ordering is important: we always want to lock pollsets before fds: - this guarantees that using an fd as a pollable is safe */ - PO_EMPTY_POLLABLE, + PO_FD, + /* ordering is important: we always want to lock pollsets before fds: + this guarantees that using an fd as a pollable is safe */ PO_EMPTY_POLLABLE, PO_COUNT } polling_obj_type; @@ -684,29 +683,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &pollset->pollable.po.mu; } -/* Convert a timespec to milliseconds: - - Very small or negative poll times are clamped to zero to do a non-blocking - poll (which becomes spin polling) - - Other small values are rounded up to one millisecond - - Longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - Infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, now) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) + return INT_MAX; + else if (delta < 0) return 0; - } - - static const gpr_timespec round_up = { - .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1}; - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); - return millis >= 1 ? millis : 1; + else + return (int)delta; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -801,9 +787,8 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable_t *p, gpr_timespec now, - gpr_timespec deadline) { - int timeout = poll_deadline_to_millis_timeout(deadline, now); + pollable_t *p, grpc_millis deadline) { + int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { char *desc = pollable_desc(p); @@ -820,7 +805,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -875,9 +860,10 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root, } /* Return true if this thread should poll */ -static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, gpr_timespec *now, - gpr_timespec deadline) { +static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + grpc_millis deadline) { bool do_poll = true; if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; @@ -900,10 +886,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->pollable->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, worker->pollable, worker, - poll_deadline_to_millis_timeout(deadline, *now)); + poll_deadline_to_millis_timeout(exec_ctx, deadline)); } while (do_poll && worker->pollable->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { + if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, worker->pollable, worker); @@ -926,7 +913,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_mu_lock(&pollset->pollable.po.mu); gpr_mu_lock(&worker->pollable->po.mu); } - *now = gpr_now(now->clock_type); + grpc_exec_ctx_invalidate_now(exec_ctx); } return do_poll && pollset->shutdown_closure == NULL && @@ -957,14 +944,13 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (0 && GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 - ".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p", - pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec, - deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller, - pollset->root_worker); + gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR + " deadline=%" PRIdPTR " kwp=%d root_worker=%p", + pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline, + pollset->kicked_without_poller, pollset->root_worker); } grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; @@ -975,7 +961,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->current_pollable != &pollset->pollable) { gpr_mu_lock(&pollset->current_pollable->po.mu); } - if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { + if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); @@ -985,8 +971,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_mu_unlock(&pollset->pollable.po.mu); if (pollset->event_cursor == pollset->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, - now, deadline), + append_error(&error, + pollset_epoll(exec_ctx, pollset, worker.pollable, deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index b88c3ba111..b612df50ea 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -25,6 +25,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <signal.h> @@ -40,13 +41,13 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -1088,30 +1089,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->shutdown_done = NULL; } -/* Convert a timespec to milliseconds: - - Very small or negative poll times are clamped to zero to do a non-blocking - poll (which becomes spin polling) - - Other small values are rounded up to one millisecond - - Longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - Infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) + return INT_MAX; + else if (delta < 0) return 0; - } - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); - return millis >= 1 ? millis : 1; + else + return (int)delta; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -1241,7 +1228,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); if (ep_rv < 0) { if (errno != EINTR) { gpr_asprintf(&err_msg, @@ -1308,10 +1295,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { GPR_TIMER_BEGIN("pollset_work", 0); grpc_error *error = GRPC_ERROR_NONE; - int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); + int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, deadline); sigset_t new_mask; diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index bcf1d9001b..bf5193de5c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -24,6 +24,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <string.h> #include <sys/socket.h> @@ -37,12 +38,11 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/murmur_hash.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -50,7 +50,6 @@ /******************************************************************************* * FD declarations */ - typedef struct grpc_fd_watcher { struct grpc_fd_watcher *next; struct grpc_fd_watcher *prev; @@ -200,8 +199,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline); /* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 @@ -872,7 +871,7 @@ static void work_combine_error(grpc_error **composite, grpc_error *error) { static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; @@ -941,7 +940,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd_watcher *watchers; struct pollfd *pfds; - timeout = poll_deadline_to_millis_timeout(deadline, now); + timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (pollset->fd_count + 2 <= inline_elements) { pfds = pollfd_space; @@ -987,7 +986,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GRPC_SCHEDULING_START_BLOCKING_REGION; GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); @@ -1064,13 +1063,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + deadline = 0; } keep_polling = 1; } - if (keep_polling) { - now = gpr_now(now.clock_type); - } } gpr_tls_set(&g_current_thread_poller, 0); if (added_worker) { @@ -1122,21 +1118,14 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; + if (deadline == 0) return 0; + grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx); + if (n < 0) return 0; + if (n > INT_MAX) return -1; + return (int)n; } /******************************************************************************* diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index d881e2d4dd..ffa8beb76c 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -205,9 +205,9 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); + grpc_pollset_worker **worker, + grpc_millis deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, deadline); } grpc_error *grpc_pollset_kick(grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1108e46ef8..dfc82fc8c6 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -52,8 +52,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, + grpc_millis deadline); grpc_error *(*pollset_kick)(grpc_pollset *pollset, grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 41c69add17..3d17afcb8f 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -104,9 +104,69 @@ static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -void grpc_exec_ctx_global_init(void) {} +static gpr_timespec + g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the + // last enum value in + // gpr_clock_type + +void grpc_exec_ctx_global_init(void) { + for (int i = 0; i < GPR_TIMESPAN; i++) { + g_start_time[i] = gpr_now((gpr_clock_type)i); + } + // allows uniform treatment in conversion functions + g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN); +} + void grpc_exec_ctx_global_shutdown(void) {} +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); + double x = GPR_MS_PER_SEC * (double)ts.tv_sec + + (double)ts.tv_nsec / GPR_NS_PER_MS + + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx) { + if (!exec_ctx->now_is_valid) { + exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + exec_ctx->now_is_valid = true; + } + return exec_ctx->now; +} + +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) { + exec_ctx->now_is_valid = false; +} + +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, + gpr_clock_type clock_type) { + if (clock_type == GPR_TIMESPAN) { + return gpr_time_from_millis(millis, GPR_TIMESPAN); + } + return gpr_time_add(g_start_time[clock_type], + gpr_time_from_millis(millis, GPR_TIMESPAN)); +} + +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) { + return timespec_to_atm_round_down(ts); +} + +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { + return timespec_to_atm_round_up(ts); +} + static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { exec_ctx_run, exec_ctx_sched, "exec_ctx"}; static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index c89792c8c4..6220e41a4f 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -19,10 +19,14 @@ #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H +#include <grpc/support/atm.h> #include <grpc/support/cpu.h> + #include "src/core/lib/iomgr/closure.h" -/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ +typedef gpr_atm grpc_millis; + +#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ @@ -66,6 +70,9 @@ struct grpc_exec_ctx { unsigned starting_cpu; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); + + bool now_is_valid; + grpc_millis now; }; /* initializer for grpc_exec_ctx: @@ -73,7 +80,7 @@ struct grpc_exec_ctx { #define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ { \ GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, gpr_cpu_current_cpu(), \ - finish_check_arg, finish_check \ + finish_check_arg, finish_check, false, 0 \ } /* initialize an execution context at the top level of an API call into grpc @@ -106,4 +113,10 @@ void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_shutdown(void); +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx); +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx); +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); + #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 1feea6d628..7e0c7bf5ce 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -48,7 +48,7 @@ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); grpc_executor_init(exec_ctx); - grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_list_init(exec_ctx); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_network_status_init(); @@ -95,8 +95,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == - GRPC_TIMERS_FIRED) { + exec_ctx->now_is_valid = true; + exec_ctx->now = GRPC_MILLIS_INF_FUTURE; + if (grpc_timer_check(exec_ctx, NULL) == GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/load_file.c b/src/core/lib/iomgr/load_file.c index 0b4d41ea4b..5cb4099ea4 100644 --- a/src/core/lib/iomgr/load_file.c +++ b/src/core/lib/iomgr/load_file.c @@ -25,7 +25,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/support/string.h" grpc_error *grpc_load_file(const char *filename, int add_null_terminator, @@ -73,6 +73,6 @@ end: GRPC_ERROR_UNREF(error); error = error_out; } - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; return error; } diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index a609a3877a..49d4251894 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -71,8 +71,8 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); pollset lock */ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) GRPC_MUST_USE_RESULT; + grpc_pollset_worker **worker, + grpc_millis deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is non-NULL, then kick that worker. */ diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 082e3b7947..c84e0b54e3 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -33,10 +33,10 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" static grpc_error *blocking_resolve_address_impl( @@ -81,7 +81,7 @@ static grpc_error *blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (s != 0) { /* Retry if well-known service name is recognized */ @@ -90,7 +90,7 @@ static grpc_error *blocking_resolve_address_impl( if (strcmp(port, svc[i][0]) == 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, svc[i][1], &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; break; } } diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 0cb0029f4e..305930de44 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -33,10 +33,10 @@ #include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" typedef struct { @@ -86,7 +86,7 @@ static grpc_error *blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (s != 0) { error = GRPC_WSA_ERROR(WSAGetLastError(), "getaddrinfo"); goto done; diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 6c9e51ae84..c04fb8bc07 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -35,6 +35,6 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline); + grpc_millis deadline); #endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 39dbb506e2..bb348bdf72 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -244,7 +244,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -325,9 +325,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&ac->mu); GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); @@ -341,14 +339,14 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 7e271294fd..7fcaef7679 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -135,13 +135,11 @@ static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN)); + grpc_millis deadline = grpc_exec_ctx_now(exec_ctx) + 13 * GPR_MS_PER_SEC; GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx); - GRPC_LOG_IF_ERROR("backup_poller:pollset_work", - grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, - now, deadline)); + GRPC_LOG_IF_ERROR( + "backup_poller:pollset_work", + grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, deadline)); gpr_mu_unlock(p->pollset_mu); /* last "uncovered" notification is the ref that keeps us polling, if we get * there try a cas to release it */ diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index ac392f87fe..e51cb44fbc 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -41,8 +41,7 @@ typedef struct grpc_timer grpc_timer; application callback is also responsible for maintaining information about when to free up any user-level state. */ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now); + grpc_millis deadline, grpc_closure *closure); /* Initialize *timer without setting it. This can later be passed through the regular init or cancel */ @@ -92,8 +91,8 @@ typedef enum { with high probability at least one thread in the system will see an update at any time slice. */ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next); -void grpc_timer_list_init(gpr_timespec now); + grpc_millis *next); +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); /* Consume a kick issued by grpc_kick_poller */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index c08bb525b7..c69084d680 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -99,9 +99,6 @@ static struct shared_mutables g_shared_mutables = { .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, }; -static gpr_clock_type g_clock_type; -static gpr_timespec g_start_time; - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -114,51 +111,18 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm *next, grpc_error *error); -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - -static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = GPR_MS_PER_SEC * (double)ts.tv_sec + - (double)ts.tv_nsec / GPR_NS_PER_MS + - (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = - GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_timespec atm_to_timespec(gpr_atm x) { - return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); -} - static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } -void grpc_timer_list_init(gpr_timespec now) { +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { uint32_t i; g_shared_mutables.initialized = true; gpr_mu_init(&g_shared_mutables.mu); - g_clock_type = now.clock_type; - g_start_time = now; - g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); grpc_register_tracer(&grpc_timer_trace); @@ -193,10 +157,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { g_shared_mutables.initialized = false; } -static double ts_to_dbl(gpr_timespec ts) { - return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; -} - /* returns true if the first element in the list */ static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; @@ -237,20 +197,16 @@ static void note_deadline_change(timer_shard *shard) { void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { int is_first_timer = 0; timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; - GPR_ASSERT(deadline.clock_type == g_clock_type); - GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline); + timer->deadline = deadline; if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR - "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", - timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec, - now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb); + gpr_log(GPR_DEBUG, + "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, + deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb); } if (!g_shared_mutables.initialized) { @@ -263,7 +219,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_mu_lock(&shard->mu); timer->pending = true; - if (gpr_time_cmp(deadline, now) <= 0) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline <= now) { timer->pending = false; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); gpr_mu_unlock(&shard->mu); @@ -272,8 +229,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } grpc_time_averaged_stats_add_sample(&shard->stats, - ts_to_dbl(gpr_time_sub(deadline, now))); - if (deadline_atm < shard->queue_deadline_cap) { + (double)(deadline - now) / 1000.0); + if (deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; @@ -304,12 +261,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, shard->min_deadline); } - if (deadline_atm < shard->min_deadline) { + if (deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline_atm; + shard->min_deadline = deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && deadline_atm < old_min_deadline) { - gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline_atm); + if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline); grpc_kick_poller(); } } @@ -408,8 +365,9 @@ static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { } if (timer->deadline > now) return NULL; if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer, - now - timer->deadline); + gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler", + timer, now - timer->deadline, + timer->closure->scheduler->vtable->name); } timer->pending = false; grpc_timer_heap_pop(&shard->heap); @@ -430,6 +388,10 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, } *new_min_deadline = compute_min_deadline(shard); gpr_mu_unlock(&shard->mu); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR, + (int)(shard - g_shards), n); + } return n; } @@ -502,29 +464,27 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { // prelude - GPR_ASSERT(now.clock_type == g_clock_type); - gpr_atm now_atm = timespec_to_atm_round_down(now); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ - gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); - if (now_atm < min_timer) { + grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now < min_timer) { if (next != NULL) { - *next = - atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); + *next = GPR_MIN(*next, min_timer); } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, - "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, - now_atm, min_timer); + "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, + min_timer); } return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 + now != GRPC_MILLIS_INF_FUTURE ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); @@ -534,34 +494,24 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, timespec_to_atm_round_down(*next)); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } - gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR - "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, - now.tv_sec, now.tv_nsec, now_atm, next_str, - gpr_tls_get(&g_last_seen_min_timer), + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR + " next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + now, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); } // actual code - grpc_timer_check_result r; - gpr_atm next_atm; - if (next == NULL) { - r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); - } else { - next_atm = timespec_to_atm_round_down(*next); - r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); - *next = atm_to_timespec(next_atm); - } + grpc_timer_check_result r = + run_some_expired_timers(exec_ctx, now, next, shutdown_error); // tracing if (GRPC_TRACER_ON(grpc_timer_check_trace)) { char *next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, next_atm); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 04ca44563d..f5dbe247ad 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -52,7 +52,7 @@ static bool g_kicked; static bool g_has_timed_waiter; // the deadline of the current timed waiter thread (only relevant if // g_has_timed_waiter is true) -static gpr_timespec g_timed_waiter_deadline; +static grpc_millis g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -96,9 +96,8 @@ static void start_timer_thread_and_unlock(void) { void grpc_timer_manager_tick() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_timer_check(&exec_ctx, now, &next); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_timer_check(&exec_ctx, &next); grpc_exec_ctx_finish(&exec_ctx); } @@ -121,6 +120,9 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { gpr_mu_unlock(&g_mu); } // without our lock, flush the exec_ctx + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "flush exec_ctx"); + } grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&g_mu); // garbage collect any threads hanging out that are dead @@ -133,8 +135,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { // wait until 'next' (or forever if there is already a timed waiter in the pool) // returns true if the thread should continue executing (false if it should // shutdown) -static bool wait_until(gpr_timespec next) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); +static bool wait_until(grpc_exec_ctx *exec_ctx, grpc_millis next) { gpr_mu_lock(&g_mu); // if we're not threaded anymore, leave if (!g_threaded) { @@ -168,30 +169,29 @@ static bool wait_until(gpr_timespec next) { unless their 'next' is earlier than the current timed-waiter's deadline (in which case the thread with earlier 'next' takes over as the new timed waiter) */ - if (gpr_time_cmp(next, inf_future) != 0) { - if (!g_has_timed_waiter || - (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + if (next != GRPC_MILLIS_INF_FUTURE) { + if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) { my_timed_waiter_generation = ++g_timed_waiter_generation; g_has_timed_waiter = true; g_timed_waiter_deadline = next; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = - gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx); + gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", + wait_time); } } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; } } if (GRPC_TRACER_ON(grpc_timer_check_trace) && - gpr_time_cmp(next, inf_future) == 0) { + next == GRPC_MILLIS_INF_FUTURE) { gpr_log(GPR_DEBUG, "sleep until kicked"); } - gpr_cv_wait(&g_cv_wait, &g_mu, next); + gpr_cv_wait(&g_cv_wait, &g_mu, + grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME)); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", @@ -203,7 +203,7 @@ static bool wait_until(gpr_timespec next) { // there's work to do after checking timers (code above) if (my_timed_waiter_generation == g_timed_waiter_generation) { g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; } } @@ -219,12 +219,11 @@ static bool wait_until(gpr_timespec next) { } static void timer_main_loop(grpc_exec_ctx *exec_ctx) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { - gpr_timespec next = inf_future; - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_exec_ctx_invalidate_now(exec_ctx); // check timer state, updates next to the next time to run a check - switch (grpc_timer_check(exec_ctx, now, &next)) { + switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: run_some_timers(exec_ctx); break; @@ -241,10 +240,10 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; /* fall through */ case GRPC_TIMERS_CHECKED_AND_EMPTY: - if (!wait_until(next)) { + if (!wait_until(exec_ctx, next)) { return; } break; @@ -300,7 +299,7 @@ void grpc_timer_manager_init(void) { g_completed_threads = NULL; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; start_threads(); } @@ -347,7 +346,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index a2a8e289ee..c83ecd088d 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -95,7 +95,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { /* The http call is local. If it takes more than one sec, it is for sure not on compute engine. */ - gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); + grpc_millis max_detection_delay = GPR_MS_PER_SEC; grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(pollset, &g_polling_mu); @@ -114,7 +114,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { grpc_resource_quota_create("google_default_credentials"); grpc_httpcli_get( exec_ctx, &context, &detector.pollent, resource_quota, &request, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), + grpc_exec_ctx_now(exec_ctx) + max_detection_delay, GRPC_CLOSURE_CREATE(on_compute_engine_detection_http_response, &detector, grpc_schedule_on_exec_ctx), &detector.response); @@ -131,8 +131,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { "pollset_work", grpc_pollset_work(exec_ctx, grpc_polling_entity_pollset(&detector.pollent), - &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)))) { + &worker, GRPC_MILLIS_INF_FUTURE))) { detector.is_done = 1; detector.success = 0; } diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index a27284bc50..7c24af86e8 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -380,7 +380,7 @@ void verifier_cb_ctx_destroy(grpc_exec_ctx *exec_ctx, verifier_cb_ctx *ctx) { gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0, GPR_TIMESPAN}; /* Max delay defaults to one minute. */ -gpr_timespec grpc_jwt_verifier_max_delay = {60, 0, GPR_TIMESPAN}; +grpc_millis grpc_jwt_verifier_max_delay = 60 * GPR_MS_PER_SEC; typedef struct { char *email_domain; @@ -707,7 +707,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, grpc_resource_quota_create("jwt_verifier"); grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), + grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay, GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), &ctx->responses[HTTP_RESPONSE_KEYS]); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); @@ -833,10 +833,10 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, extreme memory pressure. */ grpc_resource_quota *resource_quota = grpc_resource_quota_create("jwt_verifier"); - grpc_httpcli_get( - exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), - http_cb, &ctx->responses[rsp_idx]); + grpc_httpcli_get(exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, + resource_quota, &req, + grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay, + http_cb, &ctx->responses[rsp_idx]); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); gpr_free(req.host); gpr_free(req.http.path); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.h b/src/core/lib/security/credentials/jwt/jwt_verifier.h index 8fac452d4e..3a3261a780 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.h +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.h @@ -81,7 +81,7 @@ typedef struct { /* Globals to control the verifier. Not thread-safe. */ extern gpr_timespec grpc_jwt_verifier_clock_skew; -extern gpr_timespec grpc_jwt_verifier_max_delay; +extern grpc_millis grpc_jwt_verifier_max_delay; /* The verifier can be created with some custom mappings to help with key discovery in the case where the issuer is an email address. diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index 10b270c49c..5fe1f71f47 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -117,7 +117,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx, grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( grpc_exec_ctx *exec_ctx, const grpc_http_response *response, - grpc_mdelem *token_md, gpr_timespec *token_lifetime) { + grpc_mdelem *token_md, grpc_millis *token_lifetime) { char *null_terminated_body = NULL; char *new_access_token = NULL; grpc_credentials_status status = GRPC_CREDENTIALS_OK; @@ -183,9 +183,7 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response( } gpr_asprintf(&new_access_token, "%s %s", token_type->value, access_token->value); - token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10); - token_lifetime->tv_nsec = 0; - token_lifetime->clock_type = GPR_TIMESPAN; + *token_lifetime = strtol(expires_in->value, NULL, 10) * GPR_MS_PER_SEC; if (!GRPC_MDISNULL(*token_md)) GRPC_MDELEM_UNREF(exec_ctx, *token_md); *token_md = grpc_mdelem_from_slices( exec_ctx, @@ -214,7 +212,7 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx, grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)r->creds; grpc_mdelem access_token_md = GRPC_MDNULL; - gpr_timespec token_lifetime; + grpc_millis token_lifetime; grpc_credentials_status status = grpc_oauth2_token_fetcher_credentials_parse_server_response( exec_ctx, &r->response, &access_token_md, &token_lifetime); @@ -222,10 +220,9 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&c->mu); c->token_fetch_pending = false; c->access_token_md = GRPC_MDELEM_REF(access_token_md); - c->token_expiration = - status == GRPC_CREDENTIALS_OK - ? gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime) - : gpr_inf_past(GPR_CLOCK_REALTIME); + c->token_expiration = status == GRPC_CREDENTIALS_OK + ? grpc_exec_ctx_now(exec_ctx) + token_lifetime + : 0; grpc_oauth2_pending_get_request_metadata *pending_request = c->pending_requests; c->pending_requests = NULL; @@ -260,14 +257,12 @@ static bool oauth2_token_fetcher_get_request_metadata( grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)creds; // Check if we can use the cached token. - gpr_timespec refresh_threshold = gpr_time_from_seconds( - GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN); + grpc_millis refresh_threshold = + GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS * GPR_MS_PER_SEC; grpc_mdelem cached_access_token_md = GRPC_MDNULL; gpr_mu_lock(&c->mu); if (!GRPC_MDISNULL(c->access_token_md) && - (gpr_time_cmp( - gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)), - refresh_threshold) > 0)) { + (c->token_expiration + grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) { cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md); } if (!GRPC_MDISNULL(cached_access_token_md)) { @@ -296,10 +291,10 @@ static bool oauth2_token_fetcher_get_request_metadata( gpr_mu_unlock(&c->mu); if (start_fetch) { grpc_call_credentials_ref(creds); - c->fetch_func( - exec_ctx, grpc_credentials_metadata_request_create(creds), - &c->httpcli_context, &c->pollent, on_oauth2_token_fetcher_http_response, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), refresh_threshold)); + c->fetch_func(exec_ctx, grpc_credentials_metadata_request_create(creds), + &c->httpcli_context, &c->pollent, + on_oauth2_token_fetcher_http_response, + grpc_exec_ctx_now(exec_ctx) + refresh_threshold); } return false; } @@ -340,7 +335,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2; gpr_ref_init(&c->base.refcount, 1); gpr_mu_init(&c->mu); - c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); + c->token_expiration = 0; c->fetch_func = fetch_func; c->pollent = grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create()); @@ -358,7 +353,7 @@ static grpc_call_credentials_vtable compute_engine_vtable = { static void compute_engine_fetch_oauth2( grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent, - grpc_iomgr_cb_func response_cb, gpr_timespec deadline) { + grpc_iomgr_cb_func response_cb, grpc_millis deadline) { grpc_http_header header = {"Metadata-Flavor", "Google"}; grpc_httpcli_request request; memset(&request, 0, sizeof(grpc_httpcli_request)); @@ -409,7 +404,7 @@ static grpc_call_credentials_vtable refresh_token_vtable = { static void refresh_token_fetch_oauth2( grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent, - grpc_iomgr_cb_func response_cb, gpr_timespec deadline) { + grpc_iomgr_cb_func response_cb, grpc_millis deadline) { grpc_google_refresh_token_credentials *c = (grpc_google_refresh_token_credentials *)metadata_req->creds; grpc_http_header header = {"Content-Type", diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h index d9ad6691b8..5d9f24307f 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h @@ -57,7 +57,7 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *http_context, grpc_polling_entity *pollent, grpc_iomgr_cb_func cb, - gpr_timespec deadline); + grpc_millis deadline); typedef struct grpc_oauth2_pending_get_request_metadata { grpc_credentials_mdelem_array *md_array; @@ -70,7 +70,7 @@ typedef struct { grpc_call_credentials base; gpr_mu mu; grpc_mdelem access_token_md; - gpr_timespec token_expiration; + grpc_millis token_expiration; bool token_fetch_pending; grpc_oauth2_pending_get_request_metadata *pending_requests; grpc_httpcli_context httpcli_context; @@ -100,6 +100,6 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token( grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( grpc_exec_ctx *exec_ctx, const struct grpc_http_response *response, - grpc_mdelem *token_md, gpr_timespec *token_lifetime); + grpc_mdelem *token_md, grpc_millis *token_lifetime); #endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H */ diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c index 3ead40d807..ad7c5036da 100644 --- a/src/core/lib/support/time_posix.c +++ b/src/core/lib/support/time_posix.c @@ -30,7 +30,7 @@ #include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/iomgr/block_annotate.h" static struct timespec timespec_from_gpr(gpr_timespec gts) { struct timespec rv; @@ -159,7 +159,7 @@ void gpr_sleep_until(gpr_timespec until) { delta_ts = timespec_from_gpr(delta); GRPC_SCHEDULING_START_BLOCKING_REGION; ns_result = nanosleep(&delta_ts, NULL); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (ns_result == 0) { break; } diff --git a/src/core/lib/support/time_windows.c b/src/core/lib/support/time_windows.c index 40df3761c0..b8015c272f 100644 --- a/src/core/lib/support/time_windows.c +++ b/src/core/lib/support/time_windows.c @@ -28,7 +28,7 @@ #include <process.h> #include <sys/timeb.h> -#include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/support/time_precise.h" static LARGE_INTEGER g_start_time; @@ -94,7 +94,7 @@ void gpr_sleep_until(gpr_timespec until) { GPR_ASSERT((sleep_millis >= 0) && (sleep_millis <= INT_MAX)); GRPC_SCHEDULING_START_BLOCKING_REGION; Sleep((DWORD)sleep_millis); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; } } diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 7712f560b9..e3bcbb9d80 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -122,8 +122,7 @@ void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq, GPR_ASSERT(grpc_cq_begin_op(cq, tag)); grpc_timer_init(&exec_ctx, &alarm->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timespec_to_millis_round_up(deadline), &alarm->on_alarm); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index de5aa7b6be..ed5138006e 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -214,7 +214,7 @@ struct grpc_call { server, it's trailing metadata */ grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; int send_extra_metadata_count; - gpr_timespec send_deadline; + grpc_millis send_deadline; grpc_slice_buffer_stream sending_stream; @@ -281,7 +281,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error); -static void get_final_status(grpc_call *call, +static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details); @@ -369,11 +369,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } for (i = 0; i < 2; i++) { for (j = 0; j < 2; j++) { - call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE; } } - gpr_timespec send_deadline = - gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); + grpc_millis send_deadline = args->send_deadline; bool immediately_cancel = false; @@ -391,10 +390,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { - send_deadline = gpr_time_min( - gpr_convert_clock_type(send_deadline, - args->parent_call->send_deadline.clock_type), - args->parent_call->send_deadline); + send_deadline = GPR_MIN(send_deadline, args->parent_call->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ @@ -549,8 +545,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind"); } - get_final_status(c, set_status_value_directly, &c->final_info.final_status, - NULL); + get_final_status(exec_ctx, c, set_status_value_directly, + &c->final_info.final_status, NULL); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -736,13 +732,16 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, * FINAL STATUS CODE MANIPULATION */ -static bool get_final_status_from( - grpc_call *call, grpc_error *error, bool allow_ok_status, - void (*set_value)(grpc_status_code code, void *user_data), - void *set_value_user_data, grpc_slice *details) { +static bool get_final_status_from(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_error *error, bool allow_ok_status, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data, + grpc_slice *details) { grpc_status_code code; grpc_slice slice = grpc_empty_slice(); - grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL); + grpc_error_get_status(exec_ctx, error, call->send_deadline, &code, &slice, + NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } @@ -754,7 +753,7 @@ static bool get_final_status_from( return true; } -static void get_final_status(grpc_call *call, +static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { @@ -779,8 +778,9 @@ static void get_final_status(grpc_call *call, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set && grpc_error_has_clear_grpc_status(status[i].error)) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details)) { + if (get_final_status_from(exec_ctx, call, status[i].error, + allow_ok_status != 0, set_value, + set_value_user_data, details)) { return; } } @@ -788,8 +788,9 @@ static void get_final_status(grpc_call *call, /* If no clearly defined status exists, search for 'anything' */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details)) { + if (get_final_status_from(exec_ctx, call, status[i].error, + allow_ok_status != 0, set_value, + set_value_user_data, details)) { return; } } @@ -1330,11 +1331,11 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, } if (call->is_client) { - get_final_status(call, set_status_value_directly, + get_final_status(exec_ctx, call, set_status_value_directly, call->final_op.client.status, call->final_op.client.status_details); } else { - get_final_status(call, set_cancelled_value, + get_final_status(exec_ctx, call, set_cancelled_value, call->final_op.server.cancelled, NULL); } @@ -1611,11 +1612,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, validate_filtered_metadata(exec_ctx, bctl); GPR_TIMER_END("validate_filtered_metadata", 0); - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - call->send_deadline = - gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC); + if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) { + call->send_deadline = md->deadline; } } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index d537637cbb..be362c8000 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -49,7 +49,7 @@ typedef struct grpc_call_create_args { grpc_mdelem *add_initial_metadata; size_t add_initial_metadata_count; - gpr_timespec send_deadline; + grpc_millis send_deadline; } grpc_call_create_args; /* Create a new call based on \a args. diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 34548dac26..89760291c2 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -259,7 +259,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative, grpc_mdelem path_mdelem, - grpc_mdelem authority_mdelem, gpr_timespec deadline) { + grpc_mdelem authority_mdelem, grpc_millis deadline) { grpc_mdelem send_metadata[2]; size_t num_metadata = 0; @@ -305,7 +305,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, host != NULL ? grpc_mdelem_from_slices(&exec_ctx, GRPC_MDSTR_AUTHORITY, grpc_slice_ref_internal(*host)) : GRPC_MDNULL, - deadline); + grpc_timespec_to_millis_round_up(deadline)); grpc_exec_ctx_finish(&exec_ctx); return call; } @@ -313,7 +313,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_call *grpc_channel_create_pollset_set_call( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method, - const grpc_slice *host, gpr_timespec deadline, void *reserved) { + const grpc_slice *host, grpc_millis deadline, void *reserved) { GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( exec_ctx, channel, parent_call, propagation_mask, NULL, pollset_set, @@ -369,7 +369,8 @@ grpc_call *grpc_channel_create_registered_call( grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call *call = grpc_channel_create_call_internal( &exec_ctx, channel, parent_call, propagation_mask, completion_queue, NULL, - GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), deadline); + GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), + grpc_timespec_to_millis_round_up(deadline)); grpc_exec_ctx_finish(&exec_ctx); return call; } diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 528bb868e2..827dd992b5 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -43,7 +43,7 @@ grpc_channel *grpc_channel_create_with_builder( grpc_call *grpc_channel_create_pollset_set_call( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method, - const grpc_slice *host, gpr_timespec deadline, void *reserved); + const grpc_slice *host, grpc_millis deadline, void *reserved); /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 4726503994..b0da5f2364 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -57,8 +57,7 @@ typedef struct { grpc_error *(*kick)(grpc_pollset *pollset, grpc_pollset_worker *specific_worker); grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, grpc_millis deadline); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); @@ -96,8 +95,7 @@ static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx, static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker, - gpr_timespec now, - gpr_timespec deadline) { + grpc_millis deadline) { non_polling_poller *npp = (non_polling_poller *)pollset; if (npp->shutdown) return GRPC_ERROR_NONE; non_polling_worker w; @@ -111,7 +109,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, w.next->prev = w.prev->next = &w; } w.kicked = false; - while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline)) + gpr_timespec deadline_ts = + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME); + while (!npp->shutdown && !w.kicked && + !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; if (&w == npp->root) { npp->root = w.next; @@ -760,7 +761,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, typedef struct { gpr_atm last_seen_things_queued_ever; grpc_completion_queue *cq; - gpr_timespec deadline; + grpc_millis deadline; grpc_cq_completion *stolen_completion; void *tag; /* for pluck */ bool first_loop; @@ -789,8 +790,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { return true; } } - return !a->first_loop && - gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); } #ifndef NDEBUG @@ -819,7 +819,6 @@ static void dump_pending_tags(grpc_completion_queue *cq) {} static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; - gpr_timespec now; cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -836,23 +835,21 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, dump_pending_tags(cq); - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cq, "next"); + grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cq, - .deadline = deadline, + .deadline = deadline_millis, .stolen_completion = NULL, .tag = NULL, .first_loop = true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); - for (;;) { - gpr_timespec iteration_deadline = deadline; + grpc_millis iteration_deadline = deadline_millis; if (is_finished_arg.stolen_completion != NULL) { grpc_cq_completion *c = is_finished_arg.stolen_completion; @@ -879,7 +876,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, attempt at popping. Not doing this can potentially deadlock this thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { - iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + iteration_deadline = 0; } } @@ -900,8 +897,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, break; } - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -912,7 +909,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, gpr_mu_lock(cq->mu); cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - NULL, now, iteration_deadline); + NULL, iteration_deadline); gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { @@ -1046,8 +1043,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { } gpr_mu_unlock(cq->mu); } - return !a->first_loop && - gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); } static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, @@ -1056,7 +1052,6 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; - gpr_timespec now; cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -1075,15 +1070,14 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cq, "pluck"); gpr_mu_lock(cq->mu); + grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cq, - .deadline = deadline, + .deadline = deadline_millis, .stolen_completion = NULL, .tag = tag, .first_loop = true}; @@ -1135,8 +1129,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1144,10 +1138,9 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - &worker, now, deadline); + &worker, deadline_millis); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 6286f9159d..88e26cbeb7 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -74,7 +74,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, mdb->list.head = &calld->status; mdb->list.tail = &calld->details; mdb->list.count = 2; - mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + mdb->deadline = GRPC_MILLIS_INF_FUTURE; } static void lame_start_transport_stream_op_batch( diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 13ecc915ec..64e5ce78e7 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -137,7 +137,7 @@ struct call_data { bool host_set; grpc_slice path; grpc_slice host; - gpr_timespec deadline; + grpc_millis deadline; grpc_completion_queue *cq_new; @@ -492,11 +492,13 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_ASSERT(calld->path_set); rc->data.batch.details->host = grpc_slice_ref_internal(calld->host); rc->data.batch.details->method = grpc_slice_ref_internal(calld->path); - rc->data.batch.details->deadline = calld->deadline; + rc->data.batch.details->deadline = + grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_REALTIME); rc->data.batch.details->flags = calld->recv_initial_metadata_flags; break; case REGISTERED_CALL: - *rc->data.registered.deadline = calld->deadline; + *rc->data.registered.deadline = + grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_REALTIME); if (rc->data.registered.optional_payload) { *rc->data.registered.optional_payload = calld->payload; calld->payload = NULL; @@ -739,7 +741,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)ptr; call_data *calld = (call_data *)elem->call_data; - gpr_timespec op_deadline; + grpc_millis op_deadline; if (error == GRPC_ERROR_NONE) { GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != NULL); @@ -759,7 +761,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_REF(error); } op_deadline = calld->recv_initial_metadata->deadline; - if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { + if (op_deadline != GRPC_MILLIS_INF_FUTURE) { calld->deadline = op_deadline; } if (calld->host_set && calld->path_set) { @@ -833,7 +835,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, memset(&args, 0, sizeof(args)); args.channel = chand->channel; args.server_transport_data = transport_server_data; - args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + args.send_deadline = GRPC_MILLIS_INF_FUTURE; grpc_call *call; grpc_error *error = grpc_call_create(exec_ctx, &args, &call); grpc_call_element *elem = @@ -881,7 +883,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; memset(calld, 0, sizeof(call_data)); - calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + calld->deadline = GRPC_MILLIS_INF_FUTURE; calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c index 5e3920b627..2e3b61b7ab 100644 --- a/src/core/lib/transport/error_utils.c +++ b/src/core/lib/transport/error_utils.c @@ -39,8 +39,9 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error, return NULL; } -void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, - grpc_status_code *code, grpc_slice *slice, +void grpc_error_get_status(grpc_exec_ctx *exec_ctx, grpc_error *error, + grpc_millis deadline, grpc_status_code *code, + grpc_slice *slice, grpc_http2_error_code *http_error) { // Start with the parent error and recurse through the tree of children // until we find the first one that has a status code. @@ -63,8 +64,8 @@ void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, status = (grpc_status_code)integer; } else if (grpc_error_get_int(found_error, GRPC_ERROR_INT_HTTP2_ERROR, &integer)) { - status = grpc_http2_error_to_grpc_status((grpc_http2_error_code)integer, - deadline); + status = grpc_http2_error_to_grpc_status( + exec_ctx, (grpc_http2_error_code)integer, deadline); } if (code != NULL) *code = status; diff --git a/src/core/lib/transport/error_utils.h b/src/core/lib/transport/error_utils.h index e530884215..6211031a24 100644 --- a/src/core/lib/transport/error_utils.h +++ b/src/core/lib/transport/error_utils.h @@ -20,6 +20,7 @@ #define GRPC_CORE_LIB_TRANSPORT_ERROR_UTILS_H #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/http2_errors.h" /// A utility function to get the status code and message to be returned @@ -28,8 +29,9 @@ /// All attributes are pulled from the same child error. If any of the /// attributes (code, msg, http_status) are unneeded, they can be passed as /// NULL. -void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, - grpc_status_code *code, grpc_slice *slice, +void grpc_error_get_status(grpc_exec_ctx *exec_ctx, grpc_error *error, + grpc_millis deadline, grpc_status_code *code, + grpc_slice *slice, grpc_http2_error_code *http_status); /// A utility function to check whether there is a clear status code that diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c index a077052561..b7c46e09bb 100644 --- a/src/core/lib/transport/metadata_batch.c +++ b/src/core/lib/transport/metadata_batch.c @@ -74,7 +74,7 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) { void grpc_metadata_batch_init(grpc_metadata_batch *batch) { memset(batch, 0, sizeof(*batch)); - batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + batch->deadline = GRPC_MILLIS_INF_FUTURE; } void grpc_metadata_batch_destroy(grpc_exec_ctx *exec_ctx, @@ -270,9 +270,7 @@ void grpc_metadata_batch_clear(grpc_exec_ctx *exec_ctx, } bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) { - return batch->list.head == NULL && - gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type), - batch->deadline) == 0; + return batch->list.head == NULL && batch->deadline == GRPC_MILLIS_INF_FUTURE; } size_t grpc_metadata_batch_size(grpc_metadata_batch *batch) { diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 57d298c75c..2adf1881e7 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -51,9 +51,9 @@ typedef struct grpc_metadata_batch { grpc_mdelem_list list; grpc_metadata_batch_callouts idx; /** Used to calculate grpc-timeout at the point of sending, - or gpr_inf_future if this batch does not need to send a + or GRPC_MILLIS_INF_FUTURE if this batch does not need to send a grpc-timeout */ - gpr_timespec deadline; + grpc_millis deadline; } grpc_metadata_batch; void grpc_metadata_batch_init(grpc_metadata_batch *batch); diff --git a/src/core/lib/transport/status_conversion.c b/src/core/lib/transport/status_conversion.c index 9a76977e4b..73d20a170b 100644 --- a/src/core/lib/transport/status_conversion.c +++ b/src/core/lib/transport/status_conversion.c @@ -37,8 +37,9 @@ int grpc_status_to_http2_error(grpc_status_code status) { } } -grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error, - gpr_timespec deadline) { +grpc_status_code grpc_http2_error_to_grpc_status(grpc_exec_ctx *exec_ctx, + grpc_http2_error_code error, + grpc_millis deadline) { switch (error) { case GRPC_HTTP2_NO_ERROR: /* should never be received */ @@ -46,7 +47,7 @@ grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error, case GRPC_HTTP2_CANCEL: /* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been * exceeded */ - return gpr_time_cmp(gpr_now(deadline.clock_type), deadline) >= 0 + return grpc_exec_ctx_now(exec_ctx) > deadline ? GRPC_STATUS_DEADLINE_EXCEEDED : GRPC_STATUS_CANCELLED; case GRPC_HTTP2_ENHANCE_YOUR_CALM: diff --git a/src/core/lib/transport/status_conversion.h b/src/core/lib/transport/status_conversion.h index e93f3dfd9b..a05ed7658c 100644 --- a/src/core/lib/transport/status_conversion.h +++ b/src/core/lib/transport/status_conversion.h @@ -20,12 +20,14 @@ #define GRPC_CORE_LIB_TRANSPORT_STATUS_CONVERSION_H #include <grpc/grpc.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/http2_errors.h" /* Conversion of grpc status codes to http2 error codes (for RST_STREAM) */ grpc_http2_error_code grpc_status_to_http2_error(grpc_status_code status); -grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error, - gpr_timespec deadline); +grpc_status_code grpc_http2_error_to_grpc_status(grpc_exec_ctx *exec_ctx, + grpc_http2_error_code error, + grpc_millis deadline); /* Conversion of HTTP status codes (:status) to grpc status codes */ grpc_status_code grpc_http2_status_to_grpc_status(int status); diff --git a/src/core/lib/transport/timeout_encoding.c b/src/core/lib/transport/timeout_encoding.c index 02f179d6a3..779d016ca6 100644 --- a/src/core/lib/transport/timeout_encoding.c +++ b/src/core/lib/transport/timeout_encoding.c @@ -59,60 +59,27 @@ static void enc_seconds(char *buffer, int64_t sec) { } } -static void enc_nanos(char *buffer, int64_t x) { +static void enc_millis(char *buffer, int64_t x) { x = round_up_to_three_sig_figs(x); - if (x < 100000) { - if (x % 1000 == 0) { - enc_ext(buffer, x / 1000, 'u'); - } else { - enc_ext(buffer, x, 'n'); - } - } else if (x < 100000000) { - if (x % 1000000 == 0) { - enc_ext(buffer, x / 1000000, 'm'); - } else { - enc_ext(buffer, x / 1000, 'u'); - } - } else if (x < 1000000000) { - enc_ext(buffer, x / 1000000, 'm'); + if (x < GPR_MS_PER_SEC) { + enc_ext(buffer, x, 'm'); } else { - /* note that this is only ever called with times of less than one second, - so if we reach here the time must have been rounded up to a whole second - (and no more) */ - memcpy(buffer, "1S", 3); - } -} - -static void enc_micros(char *buffer, int64_t x) { - x = round_up_to_three_sig_figs(x); - if (x < 100000) { - if (x % 1000 == 0) { - enc_ext(buffer, x / 1000, 'm'); + if (x % GPR_MS_PER_SEC == 0) { + enc_seconds(buffer, x / GPR_MS_PER_SEC); } else { - enc_ext(buffer, x, 'u'); + enc_ext(buffer, x, 'm'); } - } else if (x < 100000000) { - if (x % 1000000 == 0) { - enc_ext(buffer, x / 1000000, 'S'); - } else { - enc_ext(buffer, x / 1000, 'm'); - } - } else { - enc_ext(buffer, x / 1000000, 'S'); } } -void grpc_http2_encode_timeout(gpr_timespec timeout, char *buffer) { - if (timeout.tv_sec < 0) { +void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer) { + if (timeout <= 0) { enc_tiny(buffer); - } else if (timeout.tv_sec == 0) { - enc_nanos(buffer, timeout.tv_nsec); - } else if (timeout.tv_sec < 1000 && timeout.tv_nsec != 0) { - enc_micros(buffer, - (int64_t)(timeout.tv_sec * 1000000) + - (timeout.tv_nsec / 1000 + (timeout.tv_nsec % 1000 != 0))); + } else if (timeout < 1000 * GPR_MS_PER_SEC) { + enc_millis(buffer, timeout); } else { - enc_seconds(buffer, timeout.tv_sec + (timeout.tv_nsec != 0)); + enc_seconds(buffer, + timeout / GPR_MS_PER_SEC + (timeout % GPR_MS_PER_SEC != 0)); } } @@ -121,7 +88,7 @@ static int is_all_whitespace(const char *p, const char *end) { return p == end; } -int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) { +int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout) { int32_t x = 0; const uint8_t *p = GRPC_SLICE_START_PTR(text); const uint8_t *end = GRPC_SLICE_END_PTR(text); @@ -136,7 +103,7 @@ int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) { /* spec allows max. 8 digits, but we allow values up to 1,000,000,000 */ if (x >= (100 * 1000 * 1000)) { if (x != (100 * 1000 * 1000) || digit != 0) { - *timeout = gpr_inf_future(GPR_TIMESPAN); + *timeout = GRPC_MILLIS_INF_FUTURE; return 1; } } @@ -150,22 +117,22 @@ int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) { /* decode unit specifier */ switch (*p) { case 'n': - *timeout = gpr_time_from_nanos(x, GPR_TIMESPAN); + *timeout = x / GPR_NS_PER_MS + (x % GPR_NS_PER_MS != 0); break; case 'u': - *timeout = gpr_time_from_micros(x, GPR_TIMESPAN); + *timeout = x / GPR_US_PER_MS + (x % GPR_US_PER_MS != 0); break; case 'm': - *timeout = gpr_time_from_millis(x, GPR_TIMESPAN); + *timeout = x; break; case 'S': - *timeout = gpr_time_from_seconds(x, GPR_TIMESPAN); + *timeout = x * GPR_MS_PER_SEC; break; case 'M': - *timeout = gpr_time_from_minutes(x, GPR_TIMESPAN); + *timeout = x * 60 * GPR_MS_PER_SEC; break; case 'H': - *timeout = gpr_time_from_hours(x, GPR_TIMESPAN); + *timeout = x * 60 * 60 * GPR_MS_PER_SEC; break; default: return 0; diff --git a/src/core/lib/transport/timeout_encoding.h b/src/core/lib/transport/timeout_encoding.h index 7ff35c4083..c8677719c4 100644 --- a/src/core/lib/transport/timeout_encoding.h +++ b/src/core/lib/transport/timeout_encoding.h @@ -22,13 +22,14 @@ #include <grpc/slice.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/string.h" #define GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE (GPR_LTOA_MIN_BUFSIZE + 1) /* Encode/decode timeouts to the GRPC over HTTP/2 format; encoding may round up arbitrarily */ -void grpc_http2_encode_timeout(gpr_timespec timeout, char *buffer); -int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout); +void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer); +int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout); #endif /* GRPC_CORE_LIB_TRANSPORT_TIMEOUT_ENCODING_H */ diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index 409a6c4103..2d0b1e78ea 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -48,10 +48,9 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", ")); put_metadata(b, m->md); } - if (gpr_time_cmp(md.deadline, gpr_inf_future(md.deadline.clock_type)) != 0) { + if (md.deadline != GRPC_MILLIS_INF_FUTURE) { char *tmp; - gpr_asprintf(&tmp, " deadline=%" PRId64 ".%09d", md.deadline.tv_sec, - md.deadline.tv_nsec); + gpr_asprintf(&tmp, " deadline=%" PRIdPTR, md.deadline); gpr_strvec_add(b, tmp); } } |