aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/backoff/backoff.c (renamed from src/core/lib/support/backoff.c)27
-rw-r--r--src/core/lib/backoff/backoff.h (renamed from src/core/lib/support/backoff.h)32
-rw-r--r--src/core/lib/channel/channel_stack.h2
-rw-r--r--src/core/lib/channel/handshaker.c6
-rw-r--r--src/core/lib/channel/handshaker.h2
-rw-r--r--src/core/lib/compression/stream_compression.h2
-rw-r--r--src/core/lib/http/httpcli.c13
-rw-r--r--src/core/lib/http/httpcli.h12
-rw-r--r--src/core/lib/http/httpcli_security_connector.c2
-rw-r--r--src/core/lib/iomgr/block_annotate.h (renamed from src/core/lib/support/block_annotate.h)14
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c51
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c78
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c41
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c43
-rw-r--r--src/core/lib/iomgr/ev_posix.c6
-rw-r--r--src/core/lib/iomgr/ev_posix.h4
-rw-r--r--src/core/lib/iomgr/exec_ctx.c62
-rw-r--r--src/core/lib/iomgr/exec_ctx.h17
-rw-r--r--src/core/lib/iomgr/iomgr.c7
-rw-r--r--src/core/lib/iomgr/load_file.c4
-rw-r--r--src/core/lib/iomgr/pollset.h4
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c6
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c4
-rw-r--r--src/core/lib/iomgr/tcp_client.h2
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c10
-rw-r--r--src/core/lib/iomgr/tcp_posix.c10
-rw-r--r--src/core/lib/iomgr/timer.h7
-rw-r--r--src/core/lib/iomgr/timer_generic.c124
-rw-r--r--src/core/lib/iomgr/timer_manager.c49
-rw-r--r--src/core/lib/security/credentials/google_default/google_default_credentials.c7
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.c12
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.h2
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c37
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.h6
-rw-r--r--src/core/lib/support/time_posix.c4
-rw-r--r--src/core/lib/support/time_windows.c4
-rw-r--r--src/core/lib/surface/alarm.c3
-rw-r--r--src/core/lib/surface/call.c54
-rw-r--r--src/core/lib/surface/call.h2
-rw-r--r--src/core/lib/surface/channel.c9
-rw-r--r--src/core/lib/surface/channel.h2
-rw-r--r--src/core/lib/surface/completion_queue.c49
-rw-r--r--src/core/lib/surface/lame_client.cc2
-rw-r--r--src/core/lib/surface/server.c16
-rw-r--r--src/core/lib/transport/error_utils.c9
-rw-r--r--src/core/lib/transport/error_utils.h6
-rw-r--r--src/core/lib/transport/metadata_batch.c6
-rw-r--r--src/core/lib/transport/metadata_batch.h4
-rw-r--r--src/core/lib/transport/status_conversion.c7
-rw-r--r--src/core/lib/transport/status_conversion.h6
-rw-r--r--src/core/lib/transport/timeout_encoding.c73
-rw-r--r--src/core/lib/transport/timeout_encoding.h5
-rw-r--r--src/core/lib/transport/transport_op_string.c5
53 files changed, 455 insertions, 516 deletions
diff --git a/src/core/lib/support/backoff.c b/src/core/lib/backoff/backoff.c
index 6dc0df473b..e964af8f39 100644
--- a/src/core/lib/support/backoff.c
+++ b/src/core/lib/backoff/backoff.c
@@ -16,13 +16,14 @@
*
*/
-#include "src/core/lib/support/backoff.h"
+#include "src/core/lib/backoff/backoff.h"
#include <grpc/support/useful.h>
-void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout,
- double multiplier, double jitter,
- int64_t min_timeout_millis, int64_t max_timeout_millis) {
+void grpc_backoff_init(grpc_backoff *backoff,
+ grpc_millis initial_connect_timeout, double multiplier,
+ double jitter, grpc_millis min_timeout_millis,
+ grpc_millis max_timeout_millis) {
backoff->initial_connect_timeout = initial_connect_timeout;
backoff->multiplier = multiplier;
backoff->jitter = jitter;
@@ -31,11 +32,11 @@ void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout,
backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec;
}
-gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now) {
+grpc_millis grpc_backoff_begin(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff) {
backoff->current_timeout_millis = backoff->initial_connect_timeout;
const int64_t first_timeout =
GPR_MAX(backoff->current_timeout_millis, backoff->min_timeout_millis);
- return gpr_time_add(now, gpr_time_from_millis(first_timeout, GPR_TIMESPAN));
+ return grpc_exec_ctx_now(exec_ctx) + first_timeout;
}
/* Generate a random number between 0 and 1. */
@@ -44,7 +45,7 @@ static double generate_uniform_random_number(uint32_t *rng_state) {
return *rng_state / (double)((uint32_t)1 << 31);
}
-gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) {
+grpc_millis grpc_backoff_step(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff) {
const double new_timeout_millis =
backoff->multiplier * (double)backoff->current_timeout_millis;
backoff->current_timeout_millis =
@@ -58,15 +59,15 @@ gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) {
backoff->current_timeout_millis =
(int64_t)((double)(backoff->current_timeout_millis) + jitter);
- const gpr_timespec current_deadline = gpr_time_add(
- now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN));
+ const grpc_millis current_deadline =
+ grpc_exec_ctx_now(exec_ctx) + backoff->current_timeout_millis;
- const gpr_timespec min_deadline = gpr_time_add(
- now, gpr_time_from_millis(backoff->min_timeout_millis, GPR_TIMESPAN));
+ const grpc_millis min_deadline =
+ grpc_exec_ctx_now(exec_ctx) + backoff->min_timeout_millis;
- return gpr_time_max(current_deadline, min_deadline);
+ return GPR_MAX(current_deadline, min_deadline);
}
-void gpr_backoff_reset(gpr_backoff *backoff) {
+void grpc_backoff_reset(grpc_backoff *backoff) {
backoff->current_timeout_millis = backoff->initial_connect_timeout;
}
diff --git a/src/core/lib/support/backoff.h b/src/core/lib/backoff/backoff.h
index 6e0cc3a4b6..c0798bbe5b 100644
--- a/src/core/lib/support/backoff.h
+++ b/src/core/lib/backoff/backoff.h
@@ -16,41 +16,43 @@
*
*/
-#ifndef GRPC_CORE_LIB_SUPPORT_BACKOFF_H
-#define GRPC_CORE_LIB_SUPPORT_BACKOFF_H
+#ifndef GRPC_CORE_LIB_BACKOFF_BACKOFF_H
+#define GRPC_CORE_LIB_BACKOFF_BACKOFF_H
-#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct {
/// const: how long to wait after the first failure before retrying
- int64_t initial_connect_timeout;
+ grpc_millis initial_connect_timeout;
/// const: factor with which to multiply backoff after a failed retry
double multiplier;
/// const: amount to randomize backoffs
double jitter;
/// const: minimum time between retries in milliseconds
- int64_t min_timeout_millis;
+ grpc_millis min_timeout_millis;
/// const: maximum time between retries in milliseconds
- int64_t max_timeout_millis;
+ grpc_millis max_timeout_millis;
/// random number generator
uint32_t rng_state;
/// current retry timeout in milliseconds
int64_t current_timeout_millis;
-} gpr_backoff;
+} grpc_backoff;
/// Initialize backoff machinery - does not need to be destroyed
-void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout,
- double multiplier, double jitter,
- int64_t min_timeout_millis, int64_t max_timeout_millis);
+void grpc_backoff_init(grpc_backoff *backoff,
+ grpc_millis initial_connect_timeout, double multiplier,
+ double jitter, grpc_millis min_timeout_millis,
+ grpc_millis max_timeout_millis);
/// Begin retry loop: returns a timespec for the NEXT retry
-gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now);
+grpc_millis grpc_backoff_begin(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff);
/// Step a retry loop: returns a timespec for the NEXT retry
-gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now);
-/// Reset the backoff, so the next gpr_backoff_step will be a gpr_backoff_begin
+grpc_millis grpc_backoff_step(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff);
+/// Reset the backoff, so the next grpc_backoff_step will be a
+/// grpc_backoff_begin
/// instead
-void gpr_backoff_reset(gpr_backoff *backoff);
+void grpc_backoff_reset(grpc_backoff *backoff);
-#endif /* GRPC_CORE_LIB_SUPPORT_BACKOFF_H */
+#endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index ae1cac31f7..e616a51024 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -70,7 +70,7 @@ typedef struct {
grpc_call_context_element *context;
grpc_slice path;
gpr_timespec start_time;
- gpr_timespec deadline;
+ grpc_millis deadline;
gpr_arena *arena;
grpc_call_combiner *call_combiner;
} grpc_call_element_args;
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 1753da5721..b27ee37e5b 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -232,7 +232,7 @@ static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
- gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
+ grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data) {
gpr_mu_lock(&mgr->mu);
GPR_ASSERT(mgr->index == 0);
@@ -255,9 +255,7 @@ void grpc_handshake_manager_do_handshake(
gpr_ref(&mgr->refs);
GRPC_CLOSURE_INIT(&mgr->on_timeout, on_timeout, mgr,
grpc_schedule_on_exec_ctx);
- grpc_timer_init(exec_ctx, &mgr->deadline_timer,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- &mgr->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_timer_init(exec_ctx, &mgr->deadline_timer, deadline, &mgr->on_timeout);
// Start first handshaker, which also owns a ref.
gpr_ref(&mgr->refs);
bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE);
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index eb9a59bd08..09b4a27c1c 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -145,7 +145,7 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
- gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
+ grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data);
/// Add \a mgr to the server side list of all pending handshake managers, the
diff --git a/src/core/lib/compression/stream_compression.h b/src/core/lib/compression/stream_compression.h
index 844dff81a3..0daaa9e655 100644
--- a/src/core/lib/compression/stream_compression.h
+++ b/src/core/lib/compression/stream_compression.h
@@ -87,4 +87,4 @@ grpc_stream_compression_context *grpc_stream_compression_context_create(
void grpc_stream_compression_context_destroy(
grpc_stream_compression_context *ctx);
-#endif
+#endif /* GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_H */
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 84cc39604c..7d768e9556 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -44,7 +44,7 @@ typedef struct {
grpc_endpoint *ep;
char *host;
char *ssl_host_override;
- gpr_timespec deadline;
+ grpc_millis deadline;
int have_read_byte;
const grpc_httpcli_handshaker *handshaker;
grpc_closure *on_done;
@@ -65,7 +65,7 @@ static grpc_httpcli_post_override g_post_override = NULL;
static void plaintext_handshake(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *endpoint, const char *host,
- gpr_timespec deadline,
+ grpc_millis deadline,
void (*on_done)(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_endpoint *endpoint)) {
@@ -240,7 +240,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_resource_quota *resource_quota,
const grpc_httpcli_request *request,
- gpr_timespec deadline, grpc_closure *on_done,
+ grpc_millis deadline, grpc_closure *on_done,
grpc_httpcli_response *response,
const char *name, grpc_slice request_text) {
internal_request *req =
@@ -278,9 +278,8 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_resource_quota *resource_quota,
- const grpc_httpcli_request *request,
- gpr_timespec deadline, grpc_closure *on_done,
- grpc_httpcli_response *response) {
+ const grpc_httpcli_request *request, grpc_millis deadline,
+ grpc_closure *on_done, grpc_httpcli_response *response) {
char *name;
if (g_get_override &&
g_get_override(exec_ctx, request, deadline, on_done, response)) {
@@ -298,7 +297,7 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_resource_quota *resource_quota,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
- gpr_timespec deadline, grpc_closure *on_done,
+ grpc_millis deadline, grpc_closure *on_done,
grpc_httpcli_response *response) {
char *name;
if (g_post_override &&
diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h
index 809618695e..a3baf512a3 100644
--- a/src/core/lib/http/httpcli.h
+++ b/src/core/lib/http/httpcli.h
@@ -42,7 +42,7 @@ typedef struct grpc_httpcli_context {
typedef struct {
const char *default_port;
void (*handshake)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint,
- const char *host, gpr_timespec deadline,
+ const char *host, grpc_millis deadline,
void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *endpoint));
} grpc_httpcli_handshaker;
@@ -83,8 +83,8 @@ void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx,
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_resource_quota *resource_quota,
- const grpc_httpcli_request *request,
- gpr_timespec deadline, grpc_closure *on_complete,
+ const grpc_httpcli_request *request, grpc_millis deadline,
+ grpc_closure *on_complete,
grpc_httpcli_response *response);
/* Asynchronously perform a HTTP POST.
@@ -106,18 +106,18 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_resource_quota *resource_quota,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
- gpr_timespec deadline, grpc_closure *on_complete,
+ grpc_millis deadline, grpc_closure *on_complete,
grpc_httpcli_response *response);
/* override functions return 1 if they handled the request, 0 otherwise */
typedef int (*grpc_httpcli_get_override)(grpc_exec_ctx *exec_ctx,
const grpc_httpcli_request *request,
- gpr_timespec deadline,
+ grpc_millis deadline,
grpc_closure *on_complete,
grpc_httpcli_response *response);
typedef int (*grpc_httpcli_post_override)(
grpc_exec_ctx *exec_ctx, const grpc_httpcli_request *request,
- const char *body_bytes, size_t body_size, gpr_timespec deadline,
+ const char *body_bytes, size_t body_size, grpc_millis deadline,
grpc_closure *on_complete, grpc_httpcli_response *response);
void grpc_httpcli_set_override(grpc_httpcli_get_override get,
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 97c2886525..37ee08783e 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -155,7 +155,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *tcp, const char *host,
- gpr_timespec deadline,
+ grpc_millis deadline,
void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *endpoint)) {
on_done_closure *c = gpr_malloc(sizeof(*c));
diff --git a/src/core/lib/support/block_annotate.h b/src/core/lib/iomgr/block_annotate.h
index 8e3ef7df65..cbcb5d92f0 100644
--- a/src/core/lib/support/block_annotate.h
+++ b/src/core/lib/iomgr/block_annotate.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H
-#define GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H
+#ifndef GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H
+#define GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H
#ifdef __cplusplus
extern "C" {
@@ -47,9 +47,13 @@ void gpr_thd_end_blocking_region();
#define GRPC_SCHEDULING_START_BLOCKING_REGION \
do { \
} while (0)
-#define GRPC_SCHEDULING_END_BLOCKING_REGION \
- do { \
+#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \
+ do { \
+ } while (0)
+#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(ec) \
+ do { \
+ grpc_exec_ctx_invalidate_now((ec)); \
} while (0)
#endif
-#endif /* GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H */
+#endif /* GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 0d57833d85..4e2746ad6c 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -25,6 +25,7 @@
#include <assert.h>
#include <errno.h>
+#include <limits.h>
#include <poll.h>
#include <pthread.h>
#include <string.h>
@@ -40,12 +41,12 @@
#include <grpc/support/useful.h>
#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
@@ -555,22 +556,17 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_END("pollset_shutdown", 0);
}
-static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
- gpr_timespec now) {
- gpr_timespec timeout;
- if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
- return -1;
- }
-
- if (gpr_time_cmp(deadline, now) <= 0) {
+static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
+ grpc_millis millis) {
+ if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
+ grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
+ if (delta > INT_MAX) {
+ return INT_MAX;
+ } else if (delta < 0) {
return 0;
+ } else {
+ return (int)delta;
}
-
- static const gpr_timespec round_up = {
- .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
- timeout = gpr_time_sub(deadline, now);
- int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
- return millis >= 1 ? millis : 1;
}
/* Process the epoll events found by do_epoll_wait() function.
@@ -627,11 +623,11 @@ static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
(i.e the designated poller thread) will be calling this function. So there is
no need for any synchronization when accesing fields in g_epoll_set */
static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
- gpr_timespec now, gpr_timespec deadline) {
+ grpc_millis deadline) {
GPR_TIMER_BEGIN("do_epoll_wait", 0);
int r;
- int timeout = poll_deadline_to_millis_timeout(deadline, now);
+ int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
if (timeout != 0) {
GRPC_SCHEDULING_START_BLOCKING_REGION;
}
@@ -641,7 +637,7 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
timeout);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
}
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
@@ -657,9 +653,10 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
return GRPC_ERROR_NONE;
}
-static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
- grpc_pollset_worker **worker_hdl, gpr_timespec *now,
- gpr_timespec deadline) {
+static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ grpc_pollset_worker **worker_hdl,
+ grpc_millis deadline) {
GPR_TIMER_BEGIN("begin_worker", 0);
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
@@ -744,14 +741,15 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
pollset->shutting_down);
}
- if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
+ if (gpr_cv_wait(&worker->cv, &pollset->mu,
+ grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) &&
worker->state == UNKICKED) {
/* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
received a kick */
SET_KICK_STATE(worker, KICKED);
}
}
- *now = gpr_now(now->clock_type);
+ grpc_exec_ctx_invalidate_now(exec_ctx);
}
if (GRPC_TRACER_ON(grpc_polling_trace)) {
@@ -926,7 +924,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
grpc_pollset_worker **worker_hdl,
- gpr_timespec now, gpr_timespec deadline) {
+ grpc_millis deadline) {
grpc_pollset_worker worker;
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
@@ -937,7 +935,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
return GRPC_ERROR_NONE;
}
- if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
+ if (begin_worker(exec_ctx, ps, &worker, worker_hdl, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!ps->shutting_down);
@@ -960,8 +958,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
designated poller */
if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
gpr_atm_acq_load(&g_epoll_set.num_events)) {
- append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
- err_desc);
+ append_error(&error, do_epoll_wait(exec_ctx, ps, deadline), err_desc);
}
append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 51200fc064..959a8f4b4f 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -25,6 +25,7 @@
#include <assert.h>
#include <errno.h>
+#include <limits.h>
#include <poll.h>
#include <pthread.h>
#include <string.h>
@@ -38,7 +39,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h"
#include "src/core/lib/iomgr/lockfree_event.h"
@@ -46,20 +47,18 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/spinlock.h"
/*******************************************************************************
* Polling object
*/
-
typedef enum {
PO_POLLING_GROUP,
PO_POLLSET_SET,
PO_POLLSET,
- PO_FD, /* ordering is important: we always want to lock pollsets before fds:
- this guarantees that using an fd as a pollable is safe */
- PO_EMPTY_POLLABLE,
+ PO_FD,
+ /* ordering is important: we always want to lock pollsets before fds:
+ this guarantees that using an fd as a pollable is safe */ PO_EMPTY_POLLABLE,
PO_COUNT
} polling_obj_type;
@@ -687,29 +686,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &pollset->pollable_obj.po.mu;
}
-/* Convert a timespec to milliseconds:
- - Very small or negative poll times are clamped to zero to do a non-blocking
- poll (which becomes spin polling)
- - Other small values are rounded up to one millisecond
- - Longer than a millisecond polls are rounded up to the next nearest
- millisecond to avoid spinning
- - Infinite timeouts are converted to -1 */
-static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
- gpr_timespec now) {
- gpr_timespec timeout;
- if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
- return -1;
- }
-
- if (gpr_time_cmp(deadline, now) <= 0) {
+static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
+ grpc_millis millis) {
+ if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
+ grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
+ if (delta > INT_MAX)
+ return INT_MAX;
+ else if (delta < 0)
return 0;
- }
-
- static const gpr_timespec round_up = {
- .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
- timeout = gpr_time_sub(deadline, now);
- int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
- return millis >= 1 ? millis : 1;
+ else
+ return (int)delta;
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@@ -804,9 +790,8 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- pollable *p, gpr_timespec now,
- gpr_timespec deadline) {
- int timeout = poll_deadline_to_millis_timeout(deadline, now);
+ pollable *p, grpc_millis deadline) {
+ int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
char *desc = pollable_desc(p);
@@ -823,7 +808,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
}
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
@@ -878,9 +863,10 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root,
}
/* Return true if this thread should poll */
-static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
- grpc_pollset_worker **worker_hdl, gpr_timespec *now,
- gpr_timespec deadline) {
+static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ grpc_pollset_worker **worker_hdl,
+ grpc_millis deadline) {
bool do_poll = true;
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
@@ -904,10 +890,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
worker->pollable_obj, worker,
- poll_deadline_to_millis_timeout(deadline, *now));
+ poll_deadline_to_millis_timeout(exec_ctx, deadline));
}
while (do_poll && worker->pollable_obj->root_worker != worker) {
- if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, deadline)) {
+ if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu,
+ grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
worker->pollable_obj, worker);
@@ -930,7 +917,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_mu_lock(&pollset->pollable_obj.po.mu);
gpr_mu_lock(&worker->pollable_obj->po.mu);
}
- *now = gpr_now(now->clock_type);
+ grpc_exec_ctx_invalidate_now(exec_ctx);
}
return do_poll && pollset->shutdown_closure == NULL &&
@@ -961,14 +948,13 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
- gpr_timespec now, gpr_timespec deadline) {
+ grpc_millis deadline) {
grpc_pollset_worker worker;
if (0 && GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64
- ".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p",
- pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec,
- deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller,
- pollset->root_worker);
+ gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR
+ " deadline=%" PRIdPTR " kwp=%d root_worker=%p",
+ pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline,
+ pollset->kicked_without_poller, pollset->root_worker);
}
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
@@ -979,7 +965,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (pollset->current_pollable_obj != &pollset->pollable_obj) {
gpr_mu_lock(&pollset->current_pollable_obj->po.mu);
}
- if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
+ if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure);
@@ -990,7 +976,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_unlock(&pollset->pollable_obj.po.mu);
if (pollset->event_cursor == pollset->event_count) {
append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj,
- now, deadline),
+ deadline),
err_desc);
}
append_error(&error, pollset_process_events(exec_ctx, pollset, false),
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index b88c3ba111..b612df50ea 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -25,6 +25,7 @@
#include <assert.h>
#include <errno.h>
+#include <limits.h>
#include <poll.h>
#include <pthread.h>
#include <signal.h>
@@ -40,13 +41,13 @@
#include <grpc/support/useful.h>
#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/block_annotate.h"
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
@@ -1088,30 +1089,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutdown_done = NULL;
}
-/* Convert a timespec to milliseconds:
- - Very small or negative poll times are clamped to zero to do a non-blocking
- poll (which becomes spin polling)
- - Other small values are rounded up to one millisecond
- - Longer than a millisecond polls are rounded up to the next nearest
- millisecond to avoid spinning
- - Infinite timeouts are converted to -1 */
-static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
- gpr_timespec now) {
- gpr_timespec timeout;
- static const int64_t max_spin_polling_us = 10;
- if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
- return -1;
- }
-
- if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
- max_spin_polling_us,
- GPR_TIMESPAN))) <= 0) {
+static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
+ grpc_millis millis) {
+ if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
+ grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
+ if (delta > INT_MAX)
+ return INT_MAX;
+ else if (delta < 0)
return 0;
- }
- timeout = gpr_time_sub(deadline, now);
- int millis = gpr_time_to_millis(gpr_time_add(
- timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
- return millis >= 1 ? millis : 1;
+ else
+ return (int)delta;
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@@ -1241,7 +1228,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
ep_rv =
epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
if (ep_rv < 0) {
if (errno != EINTR) {
gpr_asprintf(&err_msg,
@@ -1308,10 +1295,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
- gpr_timespec now, gpr_timespec deadline) {
+ grpc_millis deadline) {
GPR_TIMER_BEGIN("pollset_work", 0);
grpc_error *error = GRPC_ERROR_NONE;
- int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
+ int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, deadline);
sigset_t new_mask;
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 7f44eda138..7a5abf4650 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -24,6 +24,7 @@
#include <assert.h>
#include <errno.h>
+#include <limits.h>
#include <poll.h>
#include <string.h>
#include <sys/socket.h>
@@ -37,12 +38,11 @@
#include <grpc/support/useful.h>
#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
-#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_cv.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/murmur_hash.h"
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
@@ -50,7 +50,6 @@
/*******************************************************************************
* FD declarations
*/
-
typedef struct grpc_fd_watcher {
struct grpc_fd_watcher *next;
struct grpc_fd_watcher *prev;
@@ -200,8 +199,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
- longer than a millisecond polls are rounded up to the next nearest
millisecond to avoid spinning
- infinite timeouts are converted to -1 */
-static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
- gpr_timespec now);
+static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
+ grpc_millis deadline);
/* Allow kick to wakeup the currently polling worker */
#define GRPC_POLLSET_CAN_KICK_SELF 1
@@ -872,7 +871,7 @@ static void work_combine_error(grpc_error **composite, grpc_error *error) {
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
- gpr_timespec now, gpr_timespec deadline) {
+ grpc_millis deadline) {
grpc_pollset_worker worker;
if (worker_hdl) *worker_hdl = &worker;
grpc_error *error = GRPC_ERROR_NONE;
@@ -941,7 +940,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd_watcher *watchers;
struct pollfd *pfds;
- timeout = poll_deadline_to_millis_timeout(deadline, now);
+ timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
if (pollset->fd_count + 2 <= inline_elements) {
pfds = pollfd_space;
@@ -987,7 +986,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GRPC_SCHEDULING_START_BLOCKING_REGION;
GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
r = grpc_poll_function(pfds, pfd_count, timeout);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r);
@@ -1064,13 +1063,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (queued_work || worker.kicked_specifically) {
/* If there's queued work on the list, then set the deadline to be
immediate so we get back out of the polling loop quickly */
- deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ deadline = 0;
}
keep_polling = 1;
}
- if (keep_polling) {
- now = gpr_now(now.clock_type);
- }
}
gpr_tls_set(&g_current_thread_poller, 0);
if (added_worker) {
@@ -1122,21 +1118,14 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
-static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
- gpr_timespec now) {
- gpr_timespec timeout;
- static const int64_t max_spin_polling_us = 10;
- if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
- return -1;
- }
- if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
- max_spin_polling_us,
- GPR_TIMESPAN))) <= 0) {
- return 0;
- }
- timeout = gpr_time_sub(deadline, now);
- return gpr_time_to_millis(gpr_time_add(
- timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
+static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
+ grpc_millis deadline) {
+ if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
+ if (deadline == 0) return 0;
+ grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx);
+ if (n < 0) return 0;
+ if (n > INT_MAX) return -1;
+ return (int)n;
}
/*******************************************************************************
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index d881e2d4dd..ffa8beb76c 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -205,9 +205,9 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline) {
- return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline);
+ grpc_pollset_worker **worker,
+ grpc_millis deadline) {
+ return g_event_engine->pollset_work(exec_ctx, pollset, worker, deadline);
}
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 1108e46ef8..dfc82fc8c6 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -52,8 +52,8 @@ typedef struct grpc_event_engine_vtable {
grpc_closure *closure);
void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline);
+ grpc_pollset_worker **worker,
+ grpc_millis deadline);
grpc_error *(*pollset_kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 41c69add17..3d17afcb8f 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -104,9 +104,69 @@ static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
-void grpc_exec_ctx_global_init(void) {}
+static gpr_timespec
+ g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the
+ // last enum value in
+ // gpr_clock_type
+
+void grpc_exec_ctx_global_init(void) {
+ for (int i = 0; i < GPR_TIMESPAN; i++) {
+ g_start_time[i] = gpr_now((gpr_clock_type)i);
+ }
+ // allows uniform treatment in conversion functions
+ g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN);
+}
+
void grpc_exec_ctx_global_shutdown(void) {}
+static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
+ ts = gpr_time_sub(ts, g_start_time[ts.clock_type]);
+ double x =
+ GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS;
+ if (x < 0) return 0;
+ if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
+ return (gpr_atm)x;
+}
+
+static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) {
+ ts = gpr_time_sub(ts, g_start_time[ts.clock_type]);
+ double x = GPR_MS_PER_SEC * (double)ts.tv_sec +
+ (double)ts.tv_nsec / GPR_NS_PER_MS +
+ (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC;
+ if (x < 0) return 0;
+ if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
+ return (gpr_atm)x;
+}
+
+grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx) {
+ if (!exec_ctx->now_is_valid) {
+ exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC));
+ exec_ctx->now_is_valid = true;
+ }
+ return exec_ctx->now;
+}
+
+void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) {
+ exec_ctx->now_is_valid = false;
+}
+
+gpr_timespec grpc_millis_to_timespec(grpc_millis millis,
+ gpr_clock_type clock_type) {
+ if (clock_type == GPR_TIMESPAN) {
+ return gpr_time_from_millis(millis, GPR_TIMESPAN);
+ }
+ return gpr_time_add(g_start_time[clock_type],
+ gpr_time_from_millis(millis, GPR_TIMESPAN));
+}
+
+grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) {
+ return timespec_to_atm_round_down(ts);
+}
+
+grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) {
+ return timespec_to_atm_round_up(ts);
+}
+
static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = {
exec_ctx_run, exec_ctx_sched, "exec_ctx"};
static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable};
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index c89792c8c4..6220e41a4f 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -19,10 +19,14 @@
#ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
#define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
+#include <grpc/support/atm.h>
#include <grpc/support/cpu.h>
+
#include "src/core/lib/iomgr/closure.h"
-/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */
+typedef gpr_atm grpc_millis;
+
+#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
@@ -66,6 +70,9 @@ struct grpc_exec_ctx {
unsigned starting_cpu;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
+
+ bool now_is_valid;
+ grpc_millis now;
};
/* initializer for grpc_exec_ctx:
@@ -73,7 +80,7 @@ struct grpc_exec_ctx {
#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \
{ \
GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, gpr_cpu_current_cpu(), \
- finish_check_arg, finish_check \
+ finish_check_arg, finish_check, false, 0 \
}
/* initialize an execution context at the top level of an API call into grpc
@@ -106,4 +113,10 @@ void grpc_exec_ctx_global_init(void);
void grpc_exec_ctx_global_init(void);
void grpc_exec_ctx_global_shutdown(void);
+grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx);
+void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx);
+gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock);
+grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec);
+grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec);
+
#endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 1feea6d628..7e0c7bf5ce 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -48,7 +48,7 @@ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) {
gpr_cv_init(&g_rcv);
grpc_exec_ctx_global_init();
grpc_executor_init(exec_ctx);
- grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_timer_list_init(exec_ctx);
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_network_status_init();
@@ -95,8 +95,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
- if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) ==
- GRPC_TIMERS_FIRED) {
+ exec_ctx->now_is_valid = true;
+ exec_ctx->now = GRPC_MILLIS_INF_FUTURE;
+ if (grpc_timer_check(exec_ctx, NULL) == GRPC_TIMERS_FIRED) {
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_flush(exec_ctx);
grpc_iomgr_platform_flush();
diff --git a/src/core/lib/iomgr/load_file.c b/src/core/lib/iomgr/load_file.c
index 0b4d41ea4b..5cb4099ea4 100644
--- a/src/core/lib/iomgr/load_file.c
+++ b/src/core/lib/iomgr/load_file.c
@@ -25,7 +25,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/support/string.h"
grpc_error *grpc_load_file(const char *filename, int add_null_terminator,
@@ -73,6 +73,6 @@ end:
GRPC_ERROR_UNREF(error);
error = error_out;
}
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX;
return error;
}
diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h
index a609a3877a..49d4251894 100644
--- a/src/core/lib/iomgr/pollset.h
+++ b/src/core/lib/iomgr/pollset.h
@@ -71,8 +71,8 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
pollset
lock */
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline) GRPC_MUST_USE_RESULT;
+ grpc_pollset_worker **worker,
+ grpc_millis deadline) GRPC_MUST_USE_RESULT;
/* Break one polling thread out of polling work for this pollset.
If specific_worker is non-NULL, then kick that worker. */
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index 082e3b7947..c84e0b54e3 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -33,10 +33,10 @@
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
-#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/string.h"
static grpc_error *blocking_resolve_address_impl(
@@ -81,7 +81,7 @@ static grpc_error *blocking_resolve_address_impl(
GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, port, &hints, &result);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX;
if (s != 0) {
/* Retry if well-known service name is recognized */
@@ -90,7 +90,7 @@ static grpc_error *blocking_resolve_address_impl(
if (strcmp(port, svc[i][0]) == 0) {
GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, svc[i][1], &hints, &result);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX;
break;
}
}
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 0cb0029f4e..305930de44 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -33,10 +33,10 @@
#include <grpc/support/string_util.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/string.h"
typedef struct {
@@ -86,7 +86,7 @@ static grpc_error *blocking_resolve_address_impl(
GRPC_SCHEDULING_START_BLOCKING_REGION;
s = getaddrinfo(host, port, &hints, &result);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX;
if (s != 0) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "getaddrinfo");
goto done;
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index 6c9e51ae84..c04fb8bc07 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -35,6 +35,6 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
- gpr_timespec deadline);
+ grpc_millis deadline);
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 39dbb506e2..bb348bdf72 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -244,7 +244,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
- gpr_timespec deadline) {
+ grpc_millis deadline) {
int fd;
grpc_dualstack_mode dsmode;
int err;
@@ -325,9 +325,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&ac->mu);
GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
- grpc_timer_init(exec_ctx, &ac->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm);
grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);
@@ -341,14 +339,14 @@ void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
- gpr_timespec deadline) = tcp_client_connect_impl;
+ grpc_millis deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
- gpr_timespec deadline) {
+ grpc_millis deadline) {
grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
channel_args, addr, deadline);
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 7e271294fd..7fcaef7679 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -135,13 +135,11 @@ static void run_poller(grpc_exec_ctx *exec_ctx, void *bp,
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p);
}
gpr_mu_lock(p->pollset_mu);
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- gpr_timespec deadline =
- gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN));
+ grpc_millis deadline = grpc_exec_ctx_now(exec_ctx) + 13 * GPR_MS_PER_SEC;
GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx);
- GRPC_LOG_IF_ERROR("backup_poller:pollset_work",
- grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL,
- now, deadline));
+ GRPC_LOG_IF_ERROR(
+ "backup_poller:pollset_work",
+ grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, deadline));
gpr_mu_unlock(p->pollset_mu);
/* last "uncovered" notification is the ref that keeps us polling, if we get
* there try a cas to release it */
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index ac392f87fe..e51cb44fbc 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -41,8 +41,7 @@ typedef struct grpc_timer grpc_timer;
application callback is also responsible for maintaining information about
when to free up any user-level state. */
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
- gpr_timespec deadline, grpc_closure *closure,
- gpr_timespec now);
+ grpc_millis deadline, grpc_closure *closure);
/* Initialize *timer without setting it. This can later be passed through
the regular init or cancel */
@@ -92,8 +91,8 @@ typedef enum {
with high probability at least one thread in the system will see an update
at any time slice. */
grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
- gpr_timespec now, gpr_timespec *next);
-void grpc_timer_list_init(gpr_timespec now);
+ grpc_millis *next);
+void grpc_timer_list_init(grpc_exec_ctx *exec_ctx);
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx);
/* Consume a kick issued by grpc_kick_poller */
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index c08bb525b7..c69084d680 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -99,9 +99,6 @@ static struct shared_mutables g_shared_mutables = {
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
};
-static gpr_clock_type g_clock_type;
-static gpr_timespec g_start_time;
-
static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
if (a > GPR_ATM_MAX - b) {
return GPR_ATM_MAX;
@@ -114,51 +111,18 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
gpr_atm *next,
grpc_error *error);
-static gpr_timespec dbl_to_ts(double d) {
- gpr_timespec ts;
- ts.tv_sec = (int64_t)d;
- ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec));
- ts.clock_type = GPR_TIMESPAN;
- return ts;
-}
-
-static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) {
- ts = gpr_time_sub(ts, g_start_time);
- double x = GPR_MS_PER_SEC * (double)ts.tv_sec +
- (double)ts.tv_nsec / GPR_NS_PER_MS +
- (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC;
- if (x < 0) return 0;
- if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
- return (gpr_atm)x;
-}
-
-static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
- ts = gpr_time_sub(ts, g_start_time);
- double x =
- GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS;
- if (x < 0) return 0;
- if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
- return (gpr_atm)x;
-}
-
-static gpr_timespec atm_to_timespec(gpr_atm x) {
- return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0));
-}
-
static gpr_atm compute_min_deadline(timer_shard *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
}
-void grpc_timer_list_init(gpr_timespec now) {
+void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {
uint32_t i;
g_shared_mutables.initialized = true;
gpr_mu_init(&g_shared_mutables.mu);
- g_clock_type = now.clock_type;
- g_start_time = now;
- g_shared_mutables.min_timer = timespec_to_atm_round_down(now);
+ g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx);
gpr_tls_init(&g_last_seen_min_timer);
gpr_tls_set(&g_last_seen_min_timer, 0);
grpc_register_tracer(&grpc_timer_trace);
@@ -193,10 +157,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
g_shared_mutables.initialized = false;
}
-static double ts_to_dbl(gpr_timespec ts) {
- return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
-}
-
/* returns true if the first element in the list */
static void list_join(grpc_timer *head, grpc_timer *timer) {
timer->next = head;
@@ -237,20 +197,16 @@ static void note_deadline_change(timer_shard *shard) {
void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; }
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
- gpr_timespec deadline, grpc_closure *closure,
- gpr_timespec now) {
+ grpc_millis deadline, grpc_closure *closure) {
int is_first_timer = 0;
timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
- GPR_ASSERT(deadline.clock_type == g_clock_type);
- GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
- gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline);
+ timer->deadline = deadline;
if (GRPC_TRACER_ON(grpc_timer_trace)) {
- gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR
- "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]",
- timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec,
- now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb);
+ gpr_log(GPR_DEBUG,
+ "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer,
+ deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb);
}
if (!g_shared_mutables.initialized) {
@@ -263,7 +219,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_mu_lock(&shard->mu);
timer->pending = true;
- if (gpr_time_cmp(deadline, now) <= 0) {
+ grpc_millis now = grpc_exec_ctx_now(exec_ctx);
+ if (deadline <= now) {
timer->pending = false;
GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&shard->mu);
@@ -272,8 +229,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
}
grpc_time_averaged_stats_add_sample(&shard->stats,
- ts_to_dbl(gpr_time_sub(deadline, now)));
- if (deadline_atm < shard->queue_deadline_cap) {
+ (double)(deadline - now) / 1000.0);
+ if (deadline < shard->queue_deadline_cap) {
is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
} else {
timer->heap_index = INVALID_HEAP_INDEX;
@@ -304,12 +261,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR,
shard->min_deadline);
}
- if (deadline_atm < shard->min_deadline) {
+ if (deadline < shard->min_deadline) {
gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline;
- shard->min_deadline = deadline_atm;
+ shard->min_deadline = deadline;
note_deadline_change(shard);
- if (shard->shard_queue_index == 0 && deadline_atm < old_min_deadline) {
- gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline_atm);
+ if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
+ gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline);
grpc_kick_poller();
}
}
@@ -408,8 +365,9 @@ static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) {
}
if (timer->deadline > now) return NULL;
if (GRPC_TRACER_ON(grpc_timer_trace)) {
- gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer,
- now - timer->deadline);
+ gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler",
+ timer, now - timer->deadline,
+ timer->closure->scheduler->vtable->name);
}
timer->pending = false;
grpc_timer_heap_pop(&shard->heap);
@@ -430,6 +388,10 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard,
}
*new_min_deadline = compute_min_deadline(shard);
gpr_mu_unlock(&shard->mu);
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR,
+ (int)(shard - g_shards), n);
+ }
return n;
}
@@ -502,29 +464,27 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
}
grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
- gpr_timespec now, gpr_timespec *next) {
+ grpc_millis *next) {
// prelude
- GPR_ASSERT(now.clock_type == g_clock_type);
- gpr_atm now_atm = timespec_to_atm_round_down(now);
+ grpc_millis now = grpc_exec_ctx_now(exec_ctx);
/* fetch from a thread-local first: this avoids contention on a globally
mutable cacheline in the common case */
- gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer);
- if (now_atm < min_timer) {
+ grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer);
+ if (now < min_timer) {
if (next != NULL) {
- *next =
- atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer));
+ *next = GPR_MIN(*next, min_timer);
}
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG,
- "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR,
- now_atm, min_timer);
+ "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now,
+ min_timer);
}
return GRPC_TIMERS_CHECKED_AND_EMPTY;
}
grpc_error *shutdown_error =
- gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0
+ now != GRPC_MILLIS_INF_FUTURE
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
@@ -534,34 +494,24 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
if (next == NULL) {
next_str = gpr_strdup("NULL");
} else {
- gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
- next->tv_nsec, timespec_to_atm_round_down(*next));
+ gpr_asprintf(&next_str, "%" PRIdPTR, *next);
}
- gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR
- "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR,
- now.tv_sec, now.tv_nsec, now_atm, next_str,
- gpr_tls_get(&g_last_seen_min_timer),
+ gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR
+ " next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR,
+ now, next_str, gpr_tls_get(&g_last_seen_min_timer),
gpr_atm_no_barrier_load(&g_shared_mutables.min_timer));
gpr_free(next_str);
}
// actual code
- grpc_timer_check_result r;
- gpr_atm next_atm;
- if (next == NULL) {
- r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error);
- } else {
- next_atm = timespec_to_atm_round_down(*next);
- r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error);
- *next = atm_to_timespec(next_atm);
- }
+ grpc_timer_check_result r =
+ run_some_expired_timers(exec_ctx, now, next, shutdown_error);
// tracing
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
char *next_str;
if (next == NULL) {
next_str = gpr_strdup("NULL");
} else {
- gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
- next->tv_nsec, next_atm);
+ gpr_asprintf(&next_str, "%" PRIdPTR, *next);
}
gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str);
gpr_free(next_str);
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index 04ca44563d..f5dbe247ad 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -52,7 +52,7 @@ static bool g_kicked;
static bool g_has_timed_waiter;
// the deadline of the current timed waiter thread (only relevant if
// g_has_timed_waiter is true)
-static gpr_timespec g_timed_waiter_deadline;
+static grpc_millis g_timed_waiter_deadline;
// generation counter to track which thread is waiting for the next timer
static uint64_t g_timed_waiter_generation;
@@ -96,9 +96,8 @@ static void start_timer_thread_and_unlock(void) {
void grpc_timer_manager_tick() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- grpc_timer_check(&exec_ctx, now, &next);
+ grpc_millis next = GRPC_MILLIS_INF_FUTURE;
+ grpc_timer_check(&exec_ctx, &next);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -121,6 +120,9 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) {
gpr_mu_unlock(&g_mu);
}
// without our lock, flush the exec_ctx
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "flush exec_ctx");
+ }
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&g_mu);
// garbage collect any threads hanging out that are dead
@@ -133,8 +135,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) {
// wait until 'next' (or forever if there is already a timed waiter in the pool)
// returns true if the thread should continue executing (false if it should
// shutdown)
-static bool wait_until(gpr_timespec next) {
- const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+static bool wait_until(grpc_exec_ctx *exec_ctx, grpc_millis next) {
gpr_mu_lock(&g_mu);
// if we're not threaded anymore, leave
if (!g_threaded) {
@@ -168,30 +169,29 @@ static bool wait_until(gpr_timespec next) {
unless their 'next' is earlier than the current timed-waiter's deadline
(in which case the thread with earlier 'next' takes over as the new timed
waiter) */
- if (gpr_time_cmp(next, inf_future) != 0) {
- if (!g_has_timed_waiter ||
- (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) {
+ if (next != GRPC_MILLIS_INF_FUTURE) {
+ if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
my_timed_waiter_generation = ++g_timed_waiter_generation;
g_has_timed_waiter = true;
g_timed_waiter_deadline = next;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_timespec wait_time =
- gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
- gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
- wait_time.tv_sec, wait_time.tv_nsec);
+ grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx);
+ gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds",
+ wait_time);
}
} else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
- next = inf_future;
+ next = GRPC_MILLIS_INF_FUTURE;
}
}
if (GRPC_TRACER_ON(grpc_timer_check_trace) &&
- gpr_time_cmp(next, inf_future) == 0) {
+ next == GRPC_MILLIS_INF_FUTURE) {
gpr_log(GPR_DEBUG, "sleep until kicked");
}
- gpr_cv_wait(&g_cv_wait, &g_mu, next);
+ gpr_cv_wait(&g_cv_wait, &g_mu,
+ grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME));
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
@@ -203,7 +203,7 @@ static bool wait_until(gpr_timespec next) {
// there's work to do after checking timers (code above)
if (my_timed_waiter_generation == g_timed_waiter_generation) {
g_has_timed_waiter = false;
- g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
}
}
@@ -219,12 +219,11 @@ static bool wait_until(gpr_timespec next) {
}
static void timer_main_loop(grpc_exec_ctx *exec_ctx) {
- const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
- gpr_timespec next = inf_future;
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ grpc_millis next = GRPC_MILLIS_INF_FUTURE;
+ grpc_exec_ctx_invalidate_now(exec_ctx);
// check timer state, updates next to the next time to run a check
- switch (grpc_timer_check(exec_ctx, now, &next)) {
+ switch (grpc_timer_check(exec_ctx, &next)) {
case GRPC_TIMERS_FIRED:
run_some_timers(exec_ctx);
break;
@@ -241,10 +240,10 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "timers not checked: expect another thread to");
}
- next = inf_future;
+ next = GRPC_MILLIS_INF_FUTURE;
/* fall through */
case GRPC_TIMERS_CHECKED_AND_EMPTY:
- if (!wait_until(next)) {
+ if (!wait_until(exec_ctx, next)) {
return;
}
break;
@@ -300,7 +299,7 @@ void grpc_timer_manager_init(void) {
g_completed_threads = NULL;
g_has_timed_waiter = false;
- g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
start_threads();
}
@@ -347,7 +346,7 @@ void grpc_kick_poller(void) {
gpr_mu_lock(&g_mu);
g_kicked = true;
g_has_timed_waiter = false;
- g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
++g_timed_waiter_generation;
gpr_cv_signal(&g_cv_wait);
gpr_mu_unlock(&g_mu);
diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c
index a2a8e289ee..c83ecd088d 100644
--- a/src/core/lib/security/credentials/google_default/google_default_credentials.c
+++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c
@@ -95,7 +95,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
/* The http call is local. If it takes more than one sec, it is for sure not
on compute engine. */
- gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN);
+ grpc_millis max_detection_delay = GPR_MS_PER_SEC;
grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(pollset, &g_polling_mu);
@@ -114,7 +114,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
grpc_resource_quota_create("google_default_credentials");
grpc_httpcli_get(
exec_ctx, &context, &detector.pollent, resource_quota, &request,
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
+ grpc_exec_ctx_now(exec_ctx) + max_detection_delay,
GRPC_CLOSURE_CREATE(on_compute_engine_detection_http_response, &detector,
grpc_schedule_on_exec_ctx),
&detector.response);
@@ -131,8 +131,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
"pollset_work",
grpc_pollset_work(exec_ctx,
grpc_polling_entity_pollset(&detector.pollent),
- &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC)))) {
+ &worker, GRPC_MILLIS_INF_FUTURE))) {
detector.is_done = 1;
detector.success = 0;
}
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index a27284bc50..7c24af86e8 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -380,7 +380,7 @@ void verifier_cb_ctx_destroy(grpc_exec_ctx *exec_ctx, verifier_cb_ctx *ctx) {
gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0, GPR_TIMESPAN};
/* Max delay defaults to one minute. */
-gpr_timespec grpc_jwt_verifier_max_delay = {60, 0, GPR_TIMESPAN};
+grpc_millis grpc_jwt_verifier_max_delay = 60 * GPR_MS_PER_SEC;
typedef struct {
char *email_domain;
@@ -707,7 +707,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_resource_quota_create("jwt_verifier");
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req,
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
+ grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay,
GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx),
&ctx->responses[HTTP_RESPONSE_KEYS]);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
@@ -833,10 +833,10 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
extreme memory pressure. */
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("jwt_verifier");
- grpc_httpcli_get(
- exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req,
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
- http_cb, &ctx->responses[rsp_idx]);
+ grpc_httpcli_get(exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent,
+ resource_quota, &req,
+ grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay,
+ http_cb, &ctx->responses[rsp_idx]);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
gpr_free(req.host);
gpr_free(req.http.path);
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.h b/src/core/lib/security/credentials/jwt/jwt_verifier.h
index 8fac452d4e..3a3261a780 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.h
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.h
@@ -81,7 +81,7 @@ typedef struct {
/* Globals to control the verifier. Not thread-safe. */
extern gpr_timespec grpc_jwt_verifier_clock_skew;
-extern gpr_timespec grpc_jwt_verifier_max_delay;
+extern grpc_millis grpc_jwt_verifier_max_delay;
/* The verifier can be created with some custom mappings to help with key
discovery in the case where the issuer is an email address.
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index 10b270c49c..5fe1f71f47 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -117,7 +117,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx,
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(
grpc_exec_ctx *exec_ctx, const grpc_http_response *response,
- grpc_mdelem *token_md, gpr_timespec *token_lifetime) {
+ grpc_mdelem *token_md, grpc_millis *token_lifetime) {
char *null_terminated_body = NULL;
char *new_access_token = NULL;
grpc_credentials_status status = GRPC_CREDENTIALS_OK;
@@ -183,9 +183,7 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response(
}
gpr_asprintf(&new_access_token, "%s %s", token_type->value,
access_token->value);
- token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10);
- token_lifetime->tv_nsec = 0;
- token_lifetime->clock_type = GPR_TIMESPAN;
+ *token_lifetime = strtol(expires_in->value, NULL, 10) * GPR_MS_PER_SEC;
if (!GRPC_MDISNULL(*token_md)) GRPC_MDELEM_UNREF(exec_ctx, *token_md);
*token_md = grpc_mdelem_from_slices(
exec_ctx,
@@ -214,7 +212,7 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx,
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)r->creds;
grpc_mdelem access_token_md = GRPC_MDNULL;
- gpr_timespec token_lifetime;
+ grpc_millis token_lifetime;
grpc_credentials_status status =
grpc_oauth2_token_fetcher_credentials_parse_server_response(
exec_ctx, &r->response, &access_token_md, &token_lifetime);
@@ -222,10 +220,9 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&c->mu);
c->token_fetch_pending = false;
c->access_token_md = GRPC_MDELEM_REF(access_token_md);
- c->token_expiration =
- status == GRPC_CREDENTIALS_OK
- ? gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime)
- : gpr_inf_past(GPR_CLOCK_REALTIME);
+ c->token_expiration = status == GRPC_CREDENTIALS_OK
+ ? grpc_exec_ctx_now(exec_ctx) + token_lifetime
+ : 0;
grpc_oauth2_pending_get_request_metadata *pending_request =
c->pending_requests;
c->pending_requests = NULL;
@@ -260,14 +257,12 @@ static bool oauth2_token_fetcher_get_request_metadata(
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
// Check if we can use the cached token.
- gpr_timespec refresh_threshold = gpr_time_from_seconds(
- GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN);
+ grpc_millis refresh_threshold =
+ GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS * GPR_MS_PER_SEC;
grpc_mdelem cached_access_token_md = GRPC_MDNULL;
gpr_mu_lock(&c->mu);
if (!GRPC_MDISNULL(c->access_token_md) &&
- (gpr_time_cmp(
- gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)),
- refresh_threshold) > 0)) {
+ (c->token_expiration + grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) {
cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md);
}
if (!GRPC_MDISNULL(cached_access_token_md)) {
@@ -296,10 +291,10 @@ static bool oauth2_token_fetcher_get_request_metadata(
gpr_mu_unlock(&c->mu);
if (start_fetch) {
grpc_call_credentials_ref(creds);
- c->fetch_func(
- exec_ctx, grpc_credentials_metadata_request_create(creds),
- &c->httpcli_context, &c->pollent, on_oauth2_token_fetcher_http_response,
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), refresh_threshold));
+ c->fetch_func(exec_ctx, grpc_credentials_metadata_request_create(creds),
+ &c->httpcli_context, &c->pollent,
+ on_oauth2_token_fetcher_http_response,
+ grpc_exec_ctx_now(exec_ctx) + refresh_threshold);
}
return false;
}
@@ -340,7 +335,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c,
c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2;
gpr_ref_init(&c->base.refcount, 1);
gpr_mu_init(&c->mu);
- c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
+ c->token_expiration = 0;
c->fetch_func = fetch_func;
c->pollent =
grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create());
@@ -358,7 +353,7 @@ static grpc_call_credentials_vtable compute_engine_vtable = {
static void compute_engine_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent,
- grpc_iomgr_cb_func response_cb, gpr_timespec deadline) {
+ grpc_iomgr_cb_func response_cb, grpc_millis deadline) {
grpc_http_header header = {"Metadata-Flavor", "Google"};
grpc_httpcli_request request;
memset(&request, 0, sizeof(grpc_httpcli_request));
@@ -409,7 +404,7 @@ static grpc_call_credentials_vtable refresh_token_vtable = {
static void refresh_token_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent,
- grpc_iomgr_cb_func response_cb, gpr_timespec deadline) {
+ grpc_iomgr_cb_func response_cb, grpc_millis deadline) {
grpc_google_refresh_token_credentials *c =
(grpc_google_refresh_token_credentials *)metadata_req->creds;
grpc_http_header header = {"Content-Type",
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
index d9ad6691b8..5d9f24307f 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
@@ -57,7 +57,7 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx,
grpc_httpcli_context *http_context,
grpc_polling_entity *pollent,
grpc_iomgr_cb_func cb,
- gpr_timespec deadline);
+ grpc_millis deadline);
typedef struct grpc_oauth2_pending_get_request_metadata {
grpc_credentials_mdelem_array *md_array;
@@ -70,7 +70,7 @@ typedef struct {
grpc_call_credentials base;
gpr_mu mu;
grpc_mdelem access_token_md;
- gpr_timespec token_expiration;
+ grpc_millis token_expiration;
bool token_fetch_pending;
grpc_oauth2_pending_get_request_metadata *pending_requests;
grpc_httpcli_context httpcli_context;
@@ -100,6 +100,6 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token(
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(
grpc_exec_ctx *exec_ctx, const struct grpc_http_response *response,
- grpc_mdelem *token_md, gpr_timespec *token_lifetime);
+ grpc_mdelem *token_md, grpc_millis *token_lifetime);
#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H */
diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c
index 3ead40d807..ad7c5036da 100644
--- a/src/core/lib/support/time_posix.c
+++ b/src/core/lib/support/time_posix.c
@@ -30,7 +30,7 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
-#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/iomgr/block_annotate.h"
static struct timespec timespec_from_gpr(gpr_timespec gts) {
struct timespec rv;
@@ -159,7 +159,7 @@ void gpr_sleep_until(gpr_timespec until) {
delta_ts = timespec_from_gpr(delta);
GRPC_SCHEDULING_START_BLOCKING_REGION;
ns_result = nanosleep(&delta_ts, NULL);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX;
if (ns_result == 0) {
break;
}
diff --git a/src/core/lib/support/time_windows.c b/src/core/lib/support/time_windows.c
index 40df3761c0..b8015c272f 100644
--- a/src/core/lib/support/time_windows.c
+++ b/src/core/lib/support/time_windows.c
@@ -28,7 +28,7 @@
#include <process.h>
#include <sys/timeb.h>
-#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/support/time_precise.h"
static LARGE_INTEGER g_start_time;
@@ -94,7 +94,7 @@ void gpr_sleep_until(gpr_timespec until) {
GPR_ASSERT((sleep_millis >= 0) && (sleep_millis <= INT_MAX));
GRPC_SCHEDULING_START_BLOCKING_REGION;
Sleep((DWORD)sleep_millis);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX;
}
}
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index 7712f560b9..e3bcbb9d80 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -122,8 +122,7 @@ void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq,
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
grpc_timer_init(&exec_ctx, &alarm->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_timespec_to_millis_round_up(deadline), &alarm->on_alarm);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index e258719c4d..b2146cf211 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -214,7 +214,7 @@ struct grpc_call {
server, it's trailing metadata */
grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
int send_extra_metadata_count;
- gpr_timespec send_deadline;
+ grpc_millis send_deadline;
grpc_slice_buffer_stream sending_stream;
@@ -281,7 +281,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
grpc_error *error);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error);
-static void get_final_status(grpc_call *call,
+static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call,
void (*set_value)(grpc_status_code code,
void *user_data),
void *set_value_user_data, grpc_slice *details);
@@ -369,11 +369,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
}
for (i = 0; i < 2; i++) {
for (j = 0; j < 2; j++) {
- call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
}
}
- gpr_timespec send_deadline =
- gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
+ grpc_millis send_deadline = args->send_deadline;
bool immediately_cancel = false;
@@ -391,10 +390,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&pc->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
- send_deadline = gpr_time_min(
- gpr_convert_clock_type(send_deadline,
- args->parent->send_deadline.clock_type),
- args->parent->send_deadline);
+ send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
}
/* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */
@@ -549,8 +545,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind");
}
- get_final_status(c, set_status_value_directly, &c->final_info.final_status,
- NULL);
+ get_final_status(exec_ctx, c, set_status_value_directly,
+ &c->final_info.final_status, NULL);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
@@ -736,13 +732,16 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
* FINAL STATUS CODE MANIPULATION
*/
-static bool get_final_status_from(
- grpc_call *call, grpc_error *error, bool allow_ok_status,
- void (*set_value)(grpc_status_code code, void *user_data),
- void *set_value_user_data, grpc_slice *details) {
+static bool get_final_status_from(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_error *error, bool allow_ok_status,
+ void (*set_value)(grpc_status_code code,
+ void *user_data),
+ void *set_value_user_data,
+ grpc_slice *details) {
grpc_status_code code;
grpc_slice slice = grpc_empty_slice();
- grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL);
+ grpc_error_get_status(exec_ctx, error, call->send_deadline, &code, &slice,
+ NULL);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
@@ -754,7 +753,7 @@ static bool get_final_status_from(
return true;
}
-static void get_final_status(grpc_call *call,
+static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call,
void (*set_value)(grpc_status_code code,
void *user_data),
void *set_value_user_data, grpc_slice *details) {
@@ -779,8 +778,9 @@ static void get_final_status(grpc_call *call,
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set &&
grpc_error_has_clear_grpc_status(status[i].error)) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details)) {
+ if (get_final_status_from(exec_ctx, call, status[i].error,
+ allow_ok_status != 0, set_value,
+ set_value_user_data, details)) {
return;
}
}
@@ -788,8 +788,9 @@ static void get_final_status(grpc_call *call,
/* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details)) {
+ if (get_final_status_from(exec_ctx, call, status[i].error,
+ allow_ok_status != 0, set_value,
+ set_value_user_data, details)) {
return;
}
}
@@ -1330,11 +1331,11 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
}
if (call->is_client) {
- get_final_status(call, set_status_value_directly,
+ get_final_status(exec_ctx, call, set_status_value_directly,
call->final_op.client.status,
call->final_op.client.status_details);
} else {
- get_final_status(call, set_cancelled_value,
+ get_final_status(exec_ctx, call, set_cancelled_value,
call->final_op.server.cancelled, NULL);
}
@@ -1611,11 +1612,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
validate_filtered_metadata(exec_ctx, bctl);
GPR_TIMER_END("validate_filtered_metadata", 0);
- if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
- 0 &&
- !call->is_client) {
- call->send_deadline =
- gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
+ if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
+ call->send_deadline = md->deadline;
}
}
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index c680139cf6..27c2f5243c 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -49,7 +49,7 @@ typedef struct grpc_call_create_args {
grpc_mdelem *add_initial_metadata;
size_t add_initial_metadata_count;
- gpr_timespec send_deadline;
+ grpc_millis send_deadline;
} grpc_call_create_args;
/* Create a new call based on \a args.
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 48962e5e45..0b5ca17cc4 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -265,7 +265,7 @@ static grpc_call *grpc_channel_create_call_internal(
grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask, grpc_completion_queue *cq,
grpc_pollset_set *pollset_set_alternative, grpc_mdelem path_mdelem,
- grpc_mdelem authority_mdelem, gpr_timespec deadline) {
+ grpc_mdelem authority_mdelem, grpc_millis deadline) {
grpc_mdelem send_metadata[2];
size_t num_metadata = 0;
@@ -311,7 +311,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
host != NULL ? grpc_mdelem_from_slices(&exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_ref_internal(*host))
: GRPC_MDNULL,
- deadline);
+ grpc_timespec_to_millis_round_up(deadline));
grpc_exec_ctx_finish(&exec_ctx);
return call;
}
@@ -319,7 +319,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *grpc_channel_create_pollset_set_call(
grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method,
- const grpc_slice *host, gpr_timespec deadline, void *reserved) {
+ const grpc_slice *host, grpc_millis deadline, void *reserved) {
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
exec_ctx, channel, parent_call, propagation_mask, NULL, pollset_set,
@@ -375,7 +375,8 @@ grpc_call *grpc_channel_create_registered_call(
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_call *call = grpc_channel_create_call_internal(
&exec_ctx, channel, parent_call, propagation_mask, completion_queue, NULL,
- GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), deadline);
+ GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority),
+ grpc_timespec_to_millis_round_up(deadline));
grpc_exec_ctx_finish(&exec_ctx);
return call;
}
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 528bb868e2..827dd992b5 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -43,7 +43,7 @@ grpc_channel *grpc_channel_create_with_builder(
grpc_call *grpc_channel_create_pollset_set_call(
grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method,
- const grpc_slice *host, gpr_timespec deadline, void *reserved);
+ const grpc_slice *host, grpc_millis deadline, void *reserved);
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 6452f0894d..5335cf111e 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -58,8 +58,7 @@ typedef struct {
grpc_error *(*kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline);
+ grpc_pollset_worker **worker, grpc_millis deadline);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
@@ -97,8 +96,7 @@ static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx,
static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_pollset_worker **worker,
- gpr_timespec now,
- gpr_timespec deadline) {
+ grpc_millis deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
@@ -112,7 +110,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
w.next->prev = w.prev->next = &w;
}
w.kicked = false;
- while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
+ gpr_timespec deadline_ts =
+ grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME);
+ while (!npp->shutdown && !w.kicked &&
+ !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;
if (&w == npp->root) {
npp->root = w.next;
@@ -765,7 +766,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
typedef struct {
gpr_atm last_seen_things_queued_ever;
grpc_completion_queue *cq;
- gpr_timespec deadline;
+ grpc_millis deadline;
grpc_cq_completion *stolen_completion;
void *tag; /* for pluck */
bool first_loop;
@@ -794,8 +795,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
return true;
}
}
- return !a->first_loop &&
- gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
#ifndef NDEBUG
@@ -824,7 +824,6 @@ static void dump_pending_tags(grpc_completion_queue *cq) {}
static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) {
grpc_event ret;
- gpr_timespec now;
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -841,23 +840,21 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
dump_pending_tags(cq);
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
-
GRPC_CQ_INTERNAL_REF(cq, "next");
+ grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
.cq = cq,
- .deadline = deadline,
+ .deadline = deadline_millis,
.stolen_completion = NULL,
.tag = NULL,
.first_loop = true};
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
-
for (;;) {
- gpr_timespec iteration_deadline = deadline;
+ grpc_millis iteration_deadline = deadline_millis;
if (is_finished_arg.stolen_completion != NULL) {
grpc_cq_completion *c = is_finished_arg.stolen_completion;
@@ -884,7 +881,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
attempt at popping. Not doing this can potentially deadlock this
thread forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
- iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
+ iteration_deadline = 0;
}
}
@@ -905,8 +902,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
break;
}
- now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop &&
+ grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cq);
@@ -917,7 +914,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
gpr_mu_lock(cq->mu);
cq->num_polls++;
grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- NULL, now, iteration_deadline);
+ NULL, iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
@@ -1051,8 +1048,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
gpr_mu_unlock(cq->mu);
}
- return !a->first_loop &&
- gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
@@ -1061,7 +1057,6 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
- gpr_timespec now;
cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -1080,15 +1075,14 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
-
GRPC_CQ_INTERNAL_REF(cq, "pluck");
gpr_mu_lock(cq->mu);
+ grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
.cq = cq,
- .deadline = deadline,
+ .deadline = deadline_millis,
.stolen_completion = NULL,
.tag = tag,
.first_loop = true};
@@ -1140,8 +1134,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
break;
}
- now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop &&
+ grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
@@ -1149,10 +1143,9 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
break;
}
-
cq->num_polls++;
grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- &worker, now, deadline);
+ &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index 6286f9159d..88e26cbeb7 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -74,7 +74,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
mdb->list.head = &calld->status;
mdb->list.tail = &calld->details;
mdb->list.count = 2;
- mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ mdb->deadline = GRPC_MILLIS_INF_FUTURE;
}
static void lame_start_transport_stream_op_batch(
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 1d0fd472d0..1d3980eb23 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -137,7 +137,7 @@ struct call_data {
bool host_set;
grpc_slice path;
grpc_slice host;
- gpr_timespec deadline;
+ grpc_millis deadline;
grpc_completion_queue *cq_new;
@@ -492,11 +492,13 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
GPR_ASSERT(calld->path_set);
rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
- rc->data.batch.details->deadline = calld->deadline;
+ rc->data.batch.details->deadline =
+ grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_REALTIME);
rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
break;
case REGISTERED_CALL:
- *rc->data.registered.deadline = calld->deadline;
+ *rc->data.registered.deadline =
+ grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_REALTIME);
if (rc->data.registered.optional_payload) {
*rc->data.registered.optional_payload = calld->payload;
calld->payload = NULL;
@@ -739,7 +741,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)ptr;
call_data *calld = (call_data *)elem->call_data;
- gpr_timespec op_deadline;
+ grpc_millis op_deadline;
if (error == GRPC_ERROR_NONE) {
GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != NULL);
@@ -759,7 +761,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
GRPC_ERROR_REF(error);
}
op_deadline = calld->recv_initial_metadata->deadline;
- if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
+ if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
calld->deadline = op_deadline;
}
if (calld->host_set && calld->path_set) {
@@ -833,7 +835,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
memset(&args, 0, sizeof(args));
args.channel = chand->channel;
args.server_transport_data = transport_server_data;
- args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ args.send_deadline = GRPC_MILLIS_INF_FUTURE;
grpc_call *call;
grpc_error *error = grpc_call_create(exec_ctx, &args, &call);
grpc_call_element *elem =
@@ -881,7 +883,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = (call_data *)elem->call_data;
channel_data *chand = (channel_data *)elem->channel_data;
memset(calld, 0, sizeof(call_data));
- calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ calld->deadline = GRPC_MILLIS_INF_FUTURE;
calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);
diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c
index 5e3920b627..2e3b61b7ab 100644
--- a/src/core/lib/transport/error_utils.c
+++ b/src/core/lib/transport/error_utils.c
@@ -39,8 +39,9 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error,
return NULL;
}
-void grpc_error_get_status(grpc_error *error, gpr_timespec deadline,
- grpc_status_code *code, grpc_slice *slice,
+void grpc_error_get_status(grpc_exec_ctx *exec_ctx, grpc_error *error,
+ grpc_millis deadline, grpc_status_code *code,
+ grpc_slice *slice,
grpc_http2_error_code *http_error) {
// Start with the parent error and recurse through the tree of children
// until we find the first one that has a status code.
@@ -63,8 +64,8 @@ void grpc_error_get_status(grpc_error *error, gpr_timespec deadline,
status = (grpc_status_code)integer;
} else if (grpc_error_get_int(found_error, GRPC_ERROR_INT_HTTP2_ERROR,
&integer)) {
- status = grpc_http2_error_to_grpc_status((grpc_http2_error_code)integer,
- deadline);
+ status = grpc_http2_error_to_grpc_status(
+ exec_ctx, (grpc_http2_error_code)integer, deadline);
}
if (code != NULL) *code = status;
diff --git a/src/core/lib/transport/error_utils.h b/src/core/lib/transport/error_utils.h
index e530884215..6211031a24 100644
--- a/src/core/lib/transport/error_utils.h
+++ b/src/core/lib/transport/error_utils.h
@@ -20,6 +20,7 @@
#define GRPC_CORE_LIB_TRANSPORT_ERROR_UTILS_H
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/http2_errors.h"
/// A utility function to get the status code and message to be returned
@@ -28,8 +29,9 @@
/// All attributes are pulled from the same child error. If any of the
/// attributes (code, msg, http_status) are unneeded, they can be passed as
/// NULL.
-void grpc_error_get_status(grpc_error *error, gpr_timespec deadline,
- grpc_status_code *code, grpc_slice *slice,
+void grpc_error_get_status(grpc_exec_ctx *exec_ctx, grpc_error *error,
+ grpc_millis deadline, grpc_status_code *code,
+ grpc_slice *slice,
grpc_http2_error_code *http_status);
/// A utility function to check whether there is a clear status code that
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index 54388bdcda..2df9c9189c 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -74,7 +74,7 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) {
void grpc_metadata_batch_init(grpc_metadata_batch *batch) {
memset(batch, 0, sizeof(*batch));
- batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ batch->deadline = GRPC_MILLIS_INF_FUTURE;
}
void grpc_metadata_batch_destroy(grpc_exec_ctx *exec_ctx,
@@ -270,9 +270,7 @@ void grpc_metadata_batch_clear(grpc_exec_ctx *exec_ctx,
}
bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) {
- return batch->list.head == NULL &&
- gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type),
- batch->deadline) == 0;
+ return batch->list.head == NULL && batch->deadline == GRPC_MILLIS_INF_FUTURE;
}
size_t grpc_metadata_batch_size(grpc_metadata_batch *batch) {
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index 57d298c75c..2adf1881e7 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -51,9 +51,9 @@ typedef struct grpc_metadata_batch {
grpc_mdelem_list list;
grpc_metadata_batch_callouts idx;
/** Used to calculate grpc-timeout at the point of sending,
- or gpr_inf_future if this batch does not need to send a
+ or GRPC_MILLIS_INF_FUTURE if this batch does not need to send a
grpc-timeout */
- gpr_timespec deadline;
+ grpc_millis deadline;
} grpc_metadata_batch;
void grpc_metadata_batch_init(grpc_metadata_batch *batch);
diff --git a/src/core/lib/transport/status_conversion.c b/src/core/lib/transport/status_conversion.c
index 9a76977e4b..73d20a170b 100644
--- a/src/core/lib/transport/status_conversion.c
+++ b/src/core/lib/transport/status_conversion.c
@@ -37,8 +37,9 @@ int grpc_status_to_http2_error(grpc_status_code status) {
}
}
-grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error,
- gpr_timespec deadline) {
+grpc_status_code grpc_http2_error_to_grpc_status(grpc_exec_ctx *exec_ctx,
+ grpc_http2_error_code error,
+ grpc_millis deadline) {
switch (error) {
case GRPC_HTTP2_NO_ERROR:
/* should never be received */
@@ -46,7 +47,7 @@ grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error,
case GRPC_HTTP2_CANCEL:
/* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been
* exceeded */
- return gpr_time_cmp(gpr_now(deadline.clock_type), deadline) >= 0
+ return grpc_exec_ctx_now(exec_ctx) > deadline
? GRPC_STATUS_DEADLINE_EXCEEDED
: GRPC_STATUS_CANCELLED;
case GRPC_HTTP2_ENHANCE_YOUR_CALM:
diff --git a/src/core/lib/transport/status_conversion.h b/src/core/lib/transport/status_conversion.h
index e93f3dfd9b..a05ed7658c 100644
--- a/src/core/lib/transport/status_conversion.h
+++ b/src/core/lib/transport/status_conversion.h
@@ -20,12 +20,14 @@
#define GRPC_CORE_LIB_TRANSPORT_STATUS_CONVERSION_H
#include <grpc/grpc.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/http2_errors.h"
/* Conversion of grpc status codes to http2 error codes (for RST_STREAM) */
grpc_http2_error_code grpc_status_to_http2_error(grpc_status_code status);
-grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error,
- gpr_timespec deadline);
+grpc_status_code grpc_http2_error_to_grpc_status(grpc_exec_ctx *exec_ctx,
+ grpc_http2_error_code error,
+ grpc_millis deadline);
/* Conversion of HTTP status codes (:status) to grpc status codes */
grpc_status_code grpc_http2_status_to_grpc_status(int status);
diff --git a/src/core/lib/transport/timeout_encoding.c b/src/core/lib/transport/timeout_encoding.c
index 02f179d6a3..779d016ca6 100644
--- a/src/core/lib/transport/timeout_encoding.c
+++ b/src/core/lib/transport/timeout_encoding.c
@@ -59,60 +59,27 @@ static void enc_seconds(char *buffer, int64_t sec) {
}
}
-static void enc_nanos(char *buffer, int64_t x) {
+static void enc_millis(char *buffer, int64_t x) {
x = round_up_to_three_sig_figs(x);
- if (x < 100000) {
- if (x % 1000 == 0) {
- enc_ext(buffer, x / 1000, 'u');
- } else {
- enc_ext(buffer, x, 'n');
- }
- } else if (x < 100000000) {
- if (x % 1000000 == 0) {
- enc_ext(buffer, x / 1000000, 'm');
- } else {
- enc_ext(buffer, x / 1000, 'u');
- }
- } else if (x < 1000000000) {
- enc_ext(buffer, x / 1000000, 'm');
+ if (x < GPR_MS_PER_SEC) {
+ enc_ext(buffer, x, 'm');
} else {
- /* note that this is only ever called with times of less than one second,
- so if we reach here the time must have been rounded up to a whole second
- (and no more) */
- memcpy(buffer, "1S", 3);
- }
-}
-
-static void enc_micros(char *buffer, int64_t x) {
- x = round_up_to_three_sig_figs(x);
- if (x < 100000) {
- if (x % 1000 == 0) {
- enc_ext(buffer, x / 1000, 'm');
+ if (x % GPR_MS_PER_SEC == 0) {
+ enc_seconds(buffer, x / GPR_MS_PER_SEC);
} else {
- enc_ext(buffer, x, 'u');
+ enc_ext(buffer, x, 'm');
}
- } else if (x < 100000000) {
- if (x % 1000000 == 0) {
- enc_ext(buffer, x / 1000000, 'S');
- } else {
- enc_ext(buffer, x / 1000, 'm');
- }
- } else {
- enc_ext(buffer, x / 1000000, 'S');
}
}
-void grpc_http2_encode_timeout(gpr_timespec timeout, char *buffer) {
- if (timeout.tv_sec < 0) {
+void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer) {
+ if (timeout <= 0) {
enc_tiny(buffer);
- } else if (timeout.tv_sec == 0) {
- enc_nanos(buffer, timeout.tv_nsec);
- } else if (timeout.tv_sec < 1000 && timeout.tv_nsec != 0) {
- enc_micros(buffer,
- (int64_t)(timeout.tv_sec * 1000000) +
- (timeout.tv_nsec / 1000 + (timeout.tv_nsec % 1000 != 0)));
+ } else if (timeout < 1000 * GPR_MS_PER_SEC) {
+ enc_millis(buffer, timeout);
} else {
- enc_seconds(buffer, timeout.tv_sec + (timeout.tv_nsec != 0));
+ enc_seconds(buffer,
+ timeout / GPR_MS_PER_SEC + (timeout % GPR_MS_PER_SEC != 0));
}
}
@@ -121,7 +88,7 @@ static int is_all_whitespace(const char *p, const char *end) {
return p == end;
}
-int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) {
+int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout) {
int32_t x = 0;
const uint8_t *p = GRPC_SLICE_START_PTR(text);
const uint8_t *end = GRPC_SLICE_END_PTR(text);
@@ -136,7 +103,7 @@ int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) {
/* spec allows max. 8 digits, but we allow values up to 1,000,000,000 */
if (x >= (100 * 1000 * 1000)) {
if (x != (100 * 1000 * 1000) || digit != 0) {
- *timeout = gpr_inf_future(GPR_TIMESPAN);
+ *timeout = GRPC_MILLIS_INF_FUTURE;
return 1;
}
}
@@ -150,22 +117,22 @@ int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) {
/* decode unit specifier */
switch (*p) {
case 'n':
- *timeout = gpr_time_from_nanos(x, GPR_TIMESPAN);
+ *timeout = x / GPR_NS_PER_MS + (x % GPR_NS_PER_MS != 0);
break;
case 'u':
- *timeout = gpr_time_from_micros(x, GPR_TIMESPAN);
+ *timeout = x / GPR_US_PER_MS + (x % GPR_US_PER_MS != 0);
break;
case 'm':
- *timeout = gpr_time_from_millis(x, GPR_TIMESPAN);
+ *timeout = x;
break;
case 'S':
- *timeout = gpr_time_from_seconds(x, GPR_TIMESPAN);
+ *timeout = x * GPR_MS_PER_SEC;
break;
case 'M':
- *timeout = gpr_time_from_minutes(x, GPR_TIMESPAN);
+ *timeout = x * 60 * GPR_MS_PER_SEC;
break;
case 'H':
- *timeout = gpr_time_from_hours(x, GPR_TIMESPAN);
+ *timeout = x * 60 * 60 * GPR_MS_PER_SEC;
break;
default:
return 0;
diff --git a/src/core/lib/transport/timeout_encoding.h b/src/core/lib/transport/timeout_encoding.h
index 7ff35c4083..c8677719c4 100644
--- a/src/core/lib/transport/timeout_encoding.h
+++ b/src/core/lib/transport/timeout_encoding.h
@@ -22,13 +22,14 @@
#include <grpc/slice.h>
#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/string.h"
#define GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE (GPR_LTOA_MIN_BUFSIZE + 1)
/* Encode/decode timeouts to the GRPC over HTTP/2 format;
encoding may round up arbitrarily */
-void grpc_http2_encode_timeout(gpr_timespec timeout, char *buffer);
-int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout);
+void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer);
+int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout);
#endif /* GRPC_CORE_LIB_TRANSPORT_TIMEOUT_ENCODING_H */
diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c
index 409a6c4103..2d0b1e78ea 100644
--- a/src/core/lib/transport/transport_op_string.c
+++ b/src/core/lib/transport/transport_op_string.c
@@ -48,10 +48,9 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", "));
put_metadata(b, m->md);
}
- if (gpr_time_cmp(md.deadline, gpr_inf_future(md.deadline.clock_type)) != 0) {
+ if (md.deadline != GRPC_MILLIS_INF_FUTURE) {
char *tmp;
- gpr_asprintf(&tmp, " deadline=%" PRId64 ".%09d", md.deadline.tv_sec,
- md.deadline.tv_nsec);
+ gpr_asprintf(&tmp, " deadline=%" PRIdPTR, md.deadline);
gpr_strvec_add(b, tmp);
}
}