diff options
35 files changed, 302 insertions, 360 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 5788819331..bdaf567d6a 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -116,7 +116,7 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** next connect attempt time */ - gpr_timespec next_attempt; + grpc_millis next_attempt; /** backoff state */ gpr_backoff backoff_state; /** do we have an active alarm? */ @@ -463,7 +463,6 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, c->connecting = true; GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); if (!c->backoff_begun) { c->backoff_begun = true; c->next_attempt = gpr_backoff_begin(&c->backoff_state, now); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index f207155900..feb95796b3 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -132,8 +132,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. - const gpr_timespec deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); + const grpc_millis deadline = + grpc_exec_ctx_now(exec_ctx) + 120 * GPR_MS_PER_SEC; grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr, tcp, state->args, deadline, acceptor, on_handshake_done, connection_state); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 731ebf400f..ffcd715cf6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -367,36 +367,27 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ping_policy = (grpc_chttp2_repeated_ping_policy){ .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA, - .min_time_between_pings = - gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), + .min_time_between_pings = DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, .max_ping_strikes = DEFAULT_MAX_PING_STRIKES, - .min_ping_interval_without_data = gpr_time_from_millis( - DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, GPR_TIMESPAN), + .min_ping_interval_without_data = + DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, }; /* Keepalive setting */ if (t->is_client) { - t->keepalive_time = - g_default_client_keepalive_time_ms == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(g_default_client_keepalive_time_ms, - GPR_TIMESPAN); - t->keepalive_timeout = - g_default_client_keepalive_timeout_ms == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(g_default_client_keepalive_timeout_ms, - GPR_TIMESPAN); + t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_time_ms; + t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_timeout_ms; } else { - t->keepalive_time = - g_default_server_keepalive_time_ms == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(g_default_server_keepalive_time_ms, - GPR_TIMESPAN); - t->keepalive_timeout = - g_default_server_keepalive_timeout_ms == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(g_default_server_keepalive_timeout_ms, - GPR_TIMESPAN); + t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_time_ms; + t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_timeout_ms; } t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; @@ -439,21 +430,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, (grpc_integer_options){DEFAULT_MAX_PING_STRIKES, 0, INT_MAX}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) { - t->ping_policy.min_time_between_pings = gpr_time_from_millis( - grpc_channel_arg_get_integer( - &channel_args->args[i], - (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0, - INT_MAX}), - GPR_TIMESPAN); + t->ping_policy.min_time_between_pings = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0, + INT_MAX}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_MIN_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_ping_interval_without_data = gpr_time_from_millis( + t->ping_policy.min_ping_interval_without_data = grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){ - DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, 0, INT_MAX}), - GPR_TIMESPAN); + DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, 0, INT_MAX}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer( @@ -471,9 +459,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, ? g_default_client_keepalive_time_ms : g_default_server_keepalive_time_ms, 1, INT_MAX}); - t->keepalive_time = value == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(value, GPR_TIMESPAN); + t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { const int value = grpc_channel_arg_get_integer( @@ -482,9 +468,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, ? g_default_client_keepalive_timeout_ms : g_default_server_keepalive_timeout_ms, 0, INT_MAX}); - t->keepalive_timeout = value == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(value, GPR_TIMESPAN); + t->keepalive_timeout = + value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { t->keepalive_permit_without_calls = @@ -569,17 +554,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ping_policy.max_pings_without_data; t->ping_state.is_delayed_ping_timer_set = false; - t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.last_ping_recv_time = 0; t->ping_recv_state.ping_strikes = 0; /* Start keepalive pings */ - if (gpr_time_cmp(t->keepalive_time, gpr_inf_future(GPR_TIMESPAN)) != 0) { + if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->init_keepalive_ping_locked); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no inflight keeaplive timers */ @@ -1001,14 +985,12 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug " "data equal to \"too_many_pings\""); - double current_keepalive_time_ms = - gpr_timespec_to_micros(t->keepalive_time) / 1000; + double current_keepalive_time_ms = (double)t->keepalive_time; t->keepalive_time = current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis((int64_t)(current_keepalive_time_ms * - KEEPALIVE_TIME_BACKOFF_MULTIPLIER), - GPR_TIMESPAN); + ? GRPC_MILLIS_INF_FUTURE + : (grpc_millis)(current_keepalive_time_ms * + KEEPALIVE_TIME_BACKOFF_MULTIPLIER); } /* lie: use transient failure from the transport to indicate goaway has been @@ -2421,18 +2403,16 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, &t->finish_keepalive_ping_locked); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->init_keepalive_ping_locked); } } else if (error == GRPC_ERROR_CANCELLED) { /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->init_keepalive_ping_locked); } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); } @@ -2441,10 +2421,9 @@ static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = arg; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); - grpc_timer_init( - exec_ctx, &t->keepalive_watchdog_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout), - &t->keepalive_watchdog_fired_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_watchdog_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->keepalive_watchdog_fired_locked); } static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -2455,10 +2434,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->init_keepalive_ping_locked); } } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 3d7c6fbfad..31673e622d 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -89,10 +89,9 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes); } else { if (!t->is_client) { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec next_allowed_ping = - gpr_time_add(t->ping_recv_state.last_ping_recv_time, - t->ping_policy.min_ping_interval_without_data); + grpc_millis next_allowed_ping = + t->ping_recv_state.last_ping_recv_time + + t->ping_policy.min_ping_interval_without_data; if (t->keepalive_permit_without_calls == 0 && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { @@ -100,15 +99,14 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, no less than two hours. When there is no outstanding streams, we restrict the number of PINGS equivalent to TCP Keep-Alive. */ next_allowed_ping = - gpr_time_add(t->ping_recv_state.last_ping_recv_time, - gpr_time_from_seconds(7200, GPR_TIMESPAN)); + t->ping_recv_state.last_ping_recv_time + 7200 * GPR_MS_PER_SEC; } - if (gpr_time_cmp(next_allowed_ping, now) > 0) { + if (next_allowed_ping > grpc_exec_ctx_now(exec_ctx)) { grpc_chttp2_add_ping_strike(exec_ctx, t); } - t->ping_recv_state.last_ping_recv_time = now; + t->ping_recv_state.last_ping_recv_time = grpc_exec_ctx_now(exec_ctx); } if (!g_disable_ping_ack) { if (t->ping_ack_count == t->ping_ack_capacity) { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 4563b78e75..44ea3bdc61 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -85,21 +85,21 @@ typedef struct { } grpc_chttp2_ping_queue; typedef struct { - gpr_timespec min_time_between_pings; + grpc_millis min_time_between_pings; int max_pings_without_data; int max_ping_strikes; - gpr_timespec min_ping_interval_without_data; + grpc_millis min_ping_interval_without_data; } grpc_chttp2_repeated_ping_policy; typedef struct { - gpr_timespec last_ping_sent_time; + grpc_millis last_ping_sent_time; int pings_before_data_required; grpc_timer delayed_ping_timer; bool is_delayed_ping_timer_set; } grpc_chttp2_repeated_ping_state; typedef struct { - gpr_timespec last_ping_recv_time; + grpc_millis last_ping_recv_time; int ping_strikes; } grpc_chttp2_server_ping_recv_state; @@ -406,9 +406,9 @@ struct grpc_chttp2_transport { /** watchdog to kill the transport when waiting for the keepalive ping */ grpc_timer keepalive_watchdog_timer; /** time duration in between pings */ - gpr_timespec keepalive_time; + grpc_millis keepalive_time; /** grace period for a ping to complete before watchdog kicks in */ - gpr_timespec keepalive_timeout; + grpc_millis keepalive_timeout; /** if keepalive pings are allowed when there's no outstanding streams */ bool keepalive_permit_without_calls; /** keep-alive state machine state */ diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 315f2a67a2..3e16fda6dc 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -76,12 +76,12 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, } return; } - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec elapsed = gpr_time_sub(now, t->ping_state.last_ping_sent_time); + grpc_millis elapsed = + grpc_exec_ctx_now(exec_ctx) - t->ping_state.last_ping_sent_time; /*gpr_log(GPR_DEBUG, "elapsed:%d.%09d min:%d.%09d", (int)elapsed.tv_sec, elapsed.tv_nsec, (int)t->ping_policy.min_time_between_pings.tv_sec, (int)t->ping_policy.min_time_between_pings.tv_nsec);*/ - if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) { + if (elapsed < t->ping_policy.min_time_between_pings) { /* not enough elapsed time between successive pings */ if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { @@ -92,10 +92,9 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, if (!t->ping_state.is_delayed_ping_timer_set) { t->ping_state.is_delayed_ping_timer_set = true; grpc_timer_init(exec_ctx, &t->ping_state.delayed_ping_timer, - gpr_time_add(t->ping_state.last_ping_sent_time, - t->ping_policy.min_time_between_pings), - &t->retry_initiate_ping_locked, - gpr_now(GPR_CLOCK_MONOTONIC)); + t->ping_state.last_ping_sent_time + + t->ping_policy.min_time_between_pings, + &t->retry_initiate_ping_locked); } return; } @@ -116,7 +115,7 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_ping_create(false, pq->inflight_id)); - t->ping_state.last_ping_sent_time = now; + t->ping_state.last_ping_sent_time = grpc_exec_ctx_now(exec_ctx); t->ping_state.pings_before_data_required -= (t->ping_state.pings_before_data_required != 0); } @@ -286,6 +285,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( s->sent_initial_metadata = true; sent_initial_metadata = true; } + /* send any window updates */ if (s->announce_window > 0) { uint32_t announce = s->announce_window; @@ -295,8 +295,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { - t->ping_recv_state.last_ping_recv_time = - gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.last_ping_recv_time = 0; t->ping_recv_state.ping_strikes = 0; } GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce); @@ -332,8 +331,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { - t->ping_recv_state.last_ping_recv_time = - gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.last_ping_recv_time = 0; t->ping_recv_state.ping_strikes = 0; } if (is_last_frame) { @@ -425,8 +423,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { - t->ping_recv_state.last_ping_recv_time = - gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.last_ping_recv_time = 0; t->ping_recv_state.ping_strikes = 0; } } diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 2cb83f4114..c4f57a5cc8 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -231,7 +231,7 @@ static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* channel_args, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, void* user_data) { gpr_mu_lock(&mgr->mu); GPR_ASSERT(mgr->index == 0); @@ -253,9 +253,7 @@ void grpc_handshake_manager_do_handshake( gpr_ref(&mgr->refs); GRPC_CLOSURE_INIT(&mgr->on_timeout, on_timeout, mgr, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &mgr->deadline_timer, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &mgr->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &mgr->deadline_timer, deadline, &mgr->on_timeout); // Start first handshaker, which also owns a ref. gpr_ref(&mgr->refs); bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE); diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index eb9a59bd08..09b4a27c1c 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -145,7 +145,7 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* channel_args, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, void* user_data); /// Add \a mgr to the server side list of all pending handshake managers, the diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 77af7b7c08..97db3a3c57 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -44,7 +44,7 @@ typedef struct { grpc_endpoint *ep; char *host; char *ssl_host_override; - gpr_timespec deadline; + grpc_millis deadline; int have_read_byte; const grpc_httpcli_handshaker *handshaker; grpc_closure *on_done; @@ -65,7 +65,7 @@ static grpc_httpcli_post_override g_post_override = NULL; static void plaintext_handshake(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint, const char *host, - gpr_timespec deadline, + grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)) { @@ -240,7 +240,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_done, + grpc_millis deadline, grpc_closure *on_done, grpc_httpcli_response *response, const char *name, grpc_slice request_text) { internal_request *req = gpr_malloc(sizeof(internal_request)); @@ -277,9 +277,8 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, - const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_done, - grpc_httpcli_response *response) { + const grpc_httpcli_request *request, grpc_millis deadline, + grpc_closure *on_done, grpc_httpcli_response *response) { char *name; if (g_get_override && g_get_override(exec_ctx, request, deadline, on_done, response)) { @@ -297,7 +296,7 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, - gpr_timespec deadline, grpc_closure *on_done, + grpc_millis deadline, grpc_closure *on_done, grpc_httpcli_response *response) { char *name; if (g_post_override && diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 809618695e..a3baf512a3 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -42,7 +42,7 @@ typedef struct grpc_httpcli_context { typedef struct { const char *default_port; void (*handshake)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint, - const char *host, gpr_timespec deadline, + const char *host, grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)); } grpc_httpcli_handshaker; @@ -83,8 +83,8 @@ void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx, void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, - const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_complete, + const grpc_httpcli_request *request, grpc_millis deadline, + grpc_closure *on_complete, grpc_httpcli_response *response); /* Asynchronously perform a HTTP POST. @@ -106,18 +106,18 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, - gpr_timespec deadline, grpc_closure *on_complete, + grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); /* override functions return 1 if they handled the request, 0 otherwise */ typedef int (*grpc_httpcli_get_override)(grpc_exec_ctx *exec_ctx, const grpc_httpcli_request *request, - gpr_timespec deadline, + grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); typedef int (*grpc_httpcli_post_override)( grpc_exec_ctx *exec_ctx, const grpc_httpcli_request *request, - const char *body_bytes, size_t body_size, gpr_timespec deadline, + const char *body_bytes, size_t body_size, grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); void grpc_httpcli_set_override(grpc_httpcli_get_override get, diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 97c2886525..37ee08783e 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -155,7 +155,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, const char *host, - gpr_timespec deadline, + grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)) { on_done_closure *c = gpr_malloc(sizeof(*c)); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 1f8d7eef26..4dc22fbcea 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -24,6 +24,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <string.h> #include <sys/socket.h> @@ -198,8 +199,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline); /* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 @@ -846,7 +847,7 @@ static void work_combine_error(grpc_error **composite, grpc_error *error) { static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; @@ -914,7 +915,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd_watcher *watchers; struct pollfd *pfds; - timeout = poll_deadline_to_millis_timeout(deadline, now); + timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (pollset->fd_count + 2 <= inline_elements) { pfds = pollfd_space; @@ -1024,13 +1025,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + deadline = 0; } keep_polling = 1; } - if (keep_polling) { - now = gpr_now(now.clock_type); - } } gpr_tls_set(&g_current_thread_poller, 0); if (added_worker) { @@ -1082,21 +1080,14 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; + if (deadline == 0) return 0; + grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx); + if (n < 0) return 0; + if (n > INT_MAX) return -1; + return (int)n; } /******************************************************************************* diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index cff77e641c..682794f527 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -208,9 +208,9 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); + grpc_pollset_worker **worker, + grpc_millis deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, deadline); } grpc_error *grpc_pollset_kick(grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 0de7333843..21279877e8 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -52,8 +52,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, + grpc_millis deadline); grpc_error *(*pollset_kick)(grpc_pollset *pollset, grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 833170ceed..5d2fc4af86 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -108,9 +108,42 @@ static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -void grpc_exec_ctx_global_init(void) {} +static gpr_timespec g_start_time; + +void grpc_exec_ctx_global_init(void) { + g_start_time = gpr_now(GPR_CLOCK_MONOTONIC); +} + void grpc_exec_ctx_global_shutdown(void) {} +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time); + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx) { + if (!exec_ctx->now_is_valid) { + exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + exec_ctx->now_is_valid = true; + } + return exec_ctx->now; +} + +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) { + exec_ctx->now_is_valid = false; +} + +gpr_timespec grpc_millis_to_timespec(grpc_exec_ctx *exec_ctx, + grpc_millis millis, + gpr_clock_type clock_type) { + return gpr_time_add(gpr_convert_clock_type(g_start_time, clock_type), + gpr_time_from_millis(millis, GPR_TIMESPAN)); +} + static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { exec_ctx_run, exec_ctx_sched, "exec_ctx"}; static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index a0d2a965d5..8e030bf741 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -19,9 +19,13 @@ #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H +#include <grpc/support/atm.h> + #include "src/core/lib/iomgr/closure.h" -/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ +typedef gpr_atm grpc_millis; + +#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ @@ -64,12 +68,18 @@ struct grpc_exec_ctx { uintptr_t flags; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); + + bool now_is_valid; + grpc_millis now; }; /* initializer for grpc_exec_ctx: prefer to use GRPC_EXEC_CTX_INIT whenever possible */ -#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ - { GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, finish_check_arg, finish_check } +#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ + { \ + GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, finish_check_arg, finish_check, \ + false, 0 \ + } /* initialize an execution context at the top level of an API call into grpc (this is safe to use elsewhere, though possibly not as efficient) */ @@ -101,4 +111,11 @@ void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_shutdown(void); +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx); +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx); +gpr_timespec grpc_millis_to_timespec(grpc_exec_ctx *exec_ctx, + grpc_millis millis, gpr_clock_type clock); +grpc_millis grpc_timespec_to_millis(grpc_exec_ctx *exec_ctx, + gpr_timespec timespec); + #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 3d19953eeb..5e15b01ffd 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -47,8 +47,14 @@ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); +<<<<<<< HEAD + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_timer_list_init(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); +======= grpc_executor_init(exec_ctx); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); +>>>>>>> 4708a21c8144f9fed3c90ea8fa153aa20302a1d7 g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_network_status_init(); @@ -95,8 +101,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == - GRPC_TIMERS_FIRED) { + exec_ctx->now_is_valid = true; + exec_ctx->now = GRPC_MILLIS_INF_FUTURE; + if (grpc_timer_check(exec_ctx, NULL) == GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index a609a3877a..49d4251894 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -71,8 +71,8 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); pollset lock */ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) GRPC_MUST_USE_RESULT; + grpc_pollset_worker **worker, + grpc_millis deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is non-NULL, then kick that worker. */ diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 6c9e51ae84..c04fb8bc07 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -35,6 +35,6 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline); + grpc_millis deadline); #endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 21e320a6e7..992e1868c0 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -242,7 +242,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -322,9 +322,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&ac->mu); GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); @@ -338,14 +336,14 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index b92b8fb8b8..89c997bdca 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -41,8 +41,7 @@ typedef struct grpc_timer grpc_timer; application callback is also responsible for maintaining information about when to free up any user-level state. */ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now); + grpc_millis deadline, grpc_closure *closure); /* Note that there is no timer destroy function. This is because the timer is a one-time occurrence with a guarantee that the callback will @@ -88,8 +87,8 @@ typedef enum { with high probability at least one thread in the system will see an update at any time slice. */ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next); -void grpc_timer_list_init(gpr_timespec now); + grpc_millis *next); +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); /* Consume a kick issued by grpc_kick_poller */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 12efce241f..11d0b338f1 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -99,9 +99,6 @@ static struct shared_mutables g_shared_mutables = { .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, }; -static gpr_clock_type g_clock_type; -static gpr_timespec g_start_time; - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -114,51 +111,18 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm *next, grpc_error *error); -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - -static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = GPR_MS_PER_SEC * (double)ts.tv_sec + - (double)ts.tv_nsec / GPR_NS_PER_MS + - (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = - GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_timespec atm_to_timespec(gpr_atm x) { - return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); -} - static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } -void grpc_timer_list_init(gpr_timespec now) { +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { uint32_t i; g_shared_mutables.initialized = true; gpr_mu_init(&g_shared_mutables.mu); - g_clock_type = now.clock_type; - g_start_time = now; - g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); grpc_register_tracer(&grpc_timer_trace); @@ -193,10 +157,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { g_shared_mutables.initialized = false; } -static double ts_to_dbl(gpr_timespec ts) { - return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; -} - /* returns true if the first element in the list */ static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; @@ -235,20 +195,15 @@ static void note_deadline_change(timer_shard *shard) { } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { int is_first_timer = 0; timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; - GPR_ASSERT(deadline.clock_type == g_clock_type); - GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline); if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR - "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", - timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec, - now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb); + gpr_log(GPR_DEBUG, + "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, + deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb); } if (!g_shared_mutables.initialized) { @@ -261,7 +216,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_mu_lock(&shard->mu); timer->pending = true; - if (gpr_time_cmp(deadline, now) <= 0) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline <= now) { timer->pending = false; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); gpr_mu_unlock(&shard->mu); @@ -269,9 +225,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, return; } - grpc_time_averaged_stats_add_sample(&shard->stats, - ts_to_dbl(gpr_time_sub(deadline, now))); - if (deadline_atm < shard->queue_deadline_cap) { + grpc_time_averaged_stats_add_sample(&shard->stats, (deadline - now) / 1000.0); + if (deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; @@ -302,12 +257,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, shard->min_deadline); } - if (deadline_atm < shard->min_deadline) { + if (deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline_atm; + shard->min_deadline = deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && deadline_atm < old_min_deadline) { - gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline_atm); + if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline); grpc_kick_poller(); } } @@ -500,29 +455,27 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { // prelude - GPR_ASSERT(now.clock_type == g_clock_type); - gpr_atm now_atm = timespec_to_atm_round_down(now); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ - gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); - if (now_atm < min_timer) { + grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now < min_timer) { if (next != NULL) { - *next = - atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); + *next = GPR_MIN(*next, min_timer); } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, - "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, - now_atm, min_timer); + "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, + min_timer); } return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 + now != GRPC_MILLIS_INF_FUTURE ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); @@ -532,34 +485,23 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, timespec_to_atm_round_down(*next)); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } - gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR - "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, - now.tv_sec, now.tv_nsec, now_atm, next_str, - gpr_tls_get(&g_last_seen_min_timer), + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR + " next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + now, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); } // actual code - grpc_timer_check_result r; - gpr_atm next_atm; - if (next == NULL) { - r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); - } else { - next_atm = timespec_to_atm_round_down(*next); - r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); - *next = atm_to_timespec(next_atm); - } + bool r = run_some_expired_timers(exec_ctx, now, next, shutdown_error); // tracing if (GRPC_TRACER_ON(grpc_timer_check_trace)) { char *next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, next_atm); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 631f7935d9..751314c23f 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -52,7 +52,7 @@ static bool g_kicked; static bool g_has_timed_waiter; // the deadline of the current timed waiter thread (only relevant if // g_has_timed_waiter is true) -static gpr_timespec g_timed_waiter_deadline; +static grpc_millis g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -96,9 +96,8 @@ static void start_timer_thread_and_unlock(void) { void grpc_timer_manager_tick() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_timer_check(&exec_ctx, now, &next); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_timer_check(&exec_ctx, &next); grpc_exec_ctx_finish(&exec_ctx); } @@ -133,8 +132,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { // wait until 'next' (or forever if there is already a timed waiter in the pool) // returns true if the thread should continue executing (false if it should // shutdown) -static bool wait_until(gpr_timespec next) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); +static bool wait_until(grpc_exec_ctx *exec_ctx, grpc_millis next) { gpr_mu_lock(&g_mu); // if we're not threaded anymore, leave if (!g_threaded) { @@ -168,30 +166,29 @@ static bool wait_until(gpr_timespec next) { unless their 'next' is earlier than the current timed-waiter's deadline (in which case the thread with earlier 'next' takes over as the new timed waiter) */ - if (gpr_time_cmp(next, inf_future) != 0) { - if (!g_has_timed_waiter || - (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + if (next != GRPC_MILLIS_INF_FUTURE) { + if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) { my_timed_waiter_generation = ++g_timed_waiter_generation; g_has_timed_waiter = true; g_timed_waiter_deadline = next; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = - gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx); + gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", + wait_time); } } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; } } if (GRPC_TRACER_ON(grpc_timer_check_trace) && - gpr_time_cmp(next, inf_future) == 0) { + next == GRPC_MILLIS_INF_FUTURE) { gpr_log(GPR_DEBUG, "sleep until kicked"); } - gpr_cv_wait(&g_cv_wait, &g_mu, next); + gpr_cv_wait(&g_cv_wait, &g_mu, + grpc_millis_to_timespec(exec_ctx, next, GPR_CLOCK_REALTIME)); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", @@ -203,7 +200,7 @@ static bool wait_until(gpr_timespec next) { // there's work to do after checking timers (code above) if (my_timed_waiter_generation == g_timed_waiter_generation) { g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; } } @@ -219,12 +216,10 @@ static bool wait_until(gpr_timespec next) { } static void timer_main_loop(grpc_exec_ctx *exec_ctx) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { - gpr_timespec next = inf_future; - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; // check timer state, updates next to the next time to run a check - switch (grpc_timer_check(exec_ctx, now, &next)) { + switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: run_some_timers(exec_ctx); break; @@ -241,10 +236,10 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; /* fall through */ case GRPC_TIMERS_CHECKED_AND_EMPTY: - if (!wait_until(next)) { + if (!wait_until(exec_ctx, next)) { return; } break; @@ -300,7 +295,7 @@ void grpc_timer_manager_init(void) { g_completed_threads = NULL; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; start_threads(); } @@ -347,7 +342,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index a2a8e289ee..c83ecd088d 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -95,7 +95,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { /* The http call is local. If it takes more than one sec, it is for sure not on compute engine. */ - gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); + grpc_millis max_detection_delay = GPR_MS_PER_SEC; grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(pollset, &g_polling_mu); @@ -114,7 +114,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { grpc_resource_quota_create("google_default_credentials"); grpc_httpcli_get( exec_ctx, &context, &detector.pollent, resource_quota, &request, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), + grpc_exec_ctx_now(exec_ctx) + max_detection_delay, GRPC_CLOSURE_CREATE(on_compute_engine_detection_http_response, &detector, grpc_schedule_on_exec_ctx), &detector.response); @@ -131,8 +131,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { "pollset_work", grpc_pollset_work(exec_ctx, grpc_polling_entity_pollset(&detector.pollent), - &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)))) { + &worker, GRPC_MILLIS_INF_FUTURE))) { detector.is_done = 1; detector.success = 0; } diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 6cd558d123..2e739ee26e 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -380,7 +380,7 @@ void verifier_cb_ctx_destroy(grpc_exec_ctx *exec_ctx, verifier_cb_ctx *ctx) { gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0, GPR_TIMESPAN}; /* Max delay defaults to one minute. */ -gpr_timespec grpc_jwt_verifier_max_delay = {60, 0, GPR_TIMESPAN}; +grpc_millis grpc_jwt_verifier_max_delay = 60 * GPR_MS_PER_SEC; typedef struct { char *email_domain; @@ -702,7 +702,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, grpc_resource_quota_create("jwt_verifier"); grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), + grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay, GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), &ctx->responses[HTTP_RESPONSE_KEYS]); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); @@ -828,10 +828,10 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, extreme memory pressure. */ grpc_resource_quota *resource_quota = grpc_resource_quota_create("jwt_verifier"); - grpc_httpcli_get( - exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), - http_cb, &ctx->responses[rsp_idx]); + grpc_httpcli_get(exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, + resource_quota, &req, + grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay, + http_cb, &ctx->responses[rsp_idx]); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); gpr_free(req.host); gpr_free(req.http.path); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.h b/src/core/lib/security/credentials/jwt/jwt_verifier.h index 8fac452d4e..3a3261a780 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.h +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.h @@ -81,7 +81,7 @@ typedef struct { /* Globals to control the verifier. Not thread-safe. */ extern gpr_timespec grpc_jwt_verifier_clock_skew; -extern gpr_timespec grpc_jwt_verifier_max_delay; +extern grpc_millis grpc_jwt_verifier_max_delay; /* The verifier can be created with some custom mappings to help with key discovery in the case where the issuer is an email address. diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index 9de561b310..f859535e89 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -115,7 +115,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx, grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( grpc_exec_ctx *exec_ctx, const grpc_http_response *response, - grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime) { + grpc_credentials_md_store **token_md, grpc_millis *token_lifetime) { char *null_terminated_body = NULL; char *new_access_token = NULL; grpc_credentials_status status = GRPC_CREDENTIALS_OK; @@ -181,9 +181,7 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response( } gpr_asprintf(&new_access_token, "%s %s", token_type->value, access_token->value); - token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10); - token_lifetime->tv_nsec = 0; - token_lifetime->clock_type = GPR_TIMESPAN; + *token_lifetime = strtol(expires_in->value, NULL, 10) * GPR_MS_PER_SEC; if (*token_md != NULL) grpc_credentials_md_store_unref(exec_ctx, *token_md); *token_md = grpc_credentials_md_store_create(1); grpc_credentials_md_store_add_cstrings( @@ -209,7 +207,7 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx, (grpc_credentials_metadata_request *)user_data; grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)r->creds; - gpr_timespec token_lifetime; + grpc_millis token_lifetime; grpc_credentials_status status; GRPC_LOG_IF_ERROR("oauth_fetch", GRPC_ERROR_REF(error)); @@ -218,12 +216,11 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx, status = grpc_oauth2_token_fetcher_credentials_parse_server_response( exec_ctx, &r->response, &c->access_token_md, &token_lifetime); if (status == GRPC_CREDENTIALS_OK) { - c->token_expiration = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime); + c->token_expiration = grpc_exec_ctx_now(exec_ctx) + token_lifetime; r->cb(exec_ctx, r->user_data, c->access_token_md->entries, c->access_token_md->num_entries, GRPC_CREDENTIALS_OK, NULL); } else { - c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); + c->token_expiration = 0; r->cb(exec_ctx, r->user_data, NULL, 0, status, "Error occured when fetching oauth2 token."); } @@ -237,15 +234,14 @@ static void oauth2_token_fetcher_get_request_metadata( grpc_credentials_metadata_cb cb, void *user_data) { grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)creds; - gpr_timespec refresh_threshold = gpr_time_from_seconds( - GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN); + grpc_millis refresh_threshold = + GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS * GPR_MS_PER_SEC; grpc_credentials_md_store *cached_access_token_md = NULL; { gpr_mu_lock(&c->mu); if (c->access_token_md != NULL && - (gpr_time_cmp( - gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)), - refresh_threshold) > 0)) { + (c->token_expiration + grpc_exec_ctx_now(exec_ctx) > + refresh_threshold)) { cached_access_token_md = grpc_credentials_md_store_ref(c->access_token_md); } @@ -260,7 +256,7 @@ static void oauth2_token_fetcher_get_request_metadata( exec_ctx, grpc_credentials_metadata_request_create(creds, cb, user_data), &c->httpcli_context, pollent, on_oauth2_token_fetcher_http_response, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold)); + grpc_exec_ctx_now(exec_ctx) + refresh_threshold); } } @@ -270,7 +266,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2; gpr_ref_init(&c->base.refcount, 1); gpr_mu_init(&c->mu); - c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); + c->token_expiration = 0; c->fetch_func = fetch_func; grpc_httpcli_context_init(&c->httpcli_context); } @@ -285,7 +281,7 @@ static grpc_call_credentials_vtable compute_engine_vtable = { static void compute_engine_fetch_oauth2( grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent, - grpc_iomgr_cb_func response_cb, gpr_timespec deadline) { + grpc_iomgr_cb_func response_cb, grpc_millis deadline) { grpc_http_header header = {"Metadata-Flavor", "Google"}; grpc_httpcli_request request; memset(&request, 0, sizeof(grpc_httpcli_request)); @@ -336,7 +332,7 @@ static grpc_call_credentials_vtable refresh_token_vtable = { static void refresh_token_fetch_oauth2( grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent, - grpc_iomgr_cb_func response_cb, gpr_timespec deadline) { + grpc_iomgr_cb_func response_cb, grpc_millis deadline) { grpc_google_refresh_token_credentials *c = (grpc_google_refresh_token_credentials *)metadata_req->creds; grpc_http_header header = {"Content-Type", diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h index 72093afe9e..d5ada36368 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h @@ -57,12 +57,12 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *http_context, grpc_polling_entity *pollent, grpc_iomgr_cb_func cb, - gpr_timespec deadline); + grpc_millis deadline); typedef struct { grpc_call_credentials base; gpr_mu mu; grpc_credentials_md_store *access_token_md; - gpr_timespec token_expiration; + grpc_millis token_expiration; grpc_httpcli_context httpcli_context; grpc_fetch_oauth2_func fetch_func; } grpc_oauth2_token_fetcher_credentials; @@ -89,6 +89,6 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token( grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( grpc_exec_ctx *exec_ctx, const struct grpc_http_response *response, - grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime); + grpc_credentials_md_store **token_md, grpc_millis *token_lifetime); #endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H */ diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index ef8405cca8..911ac010e0 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -53,8 +53,8 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timespec_to_millis(&exec_ctx, deadline), + &alarm->on_alarm); grpc_exec_ctx_finish(&exec_ctx); return alarm; } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 978d7b4171..cefa8a2a89 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -57,8 +57,7 @@ typedef struct { grpc_error *(*kick)(grpc_pollset *pollset, grpc_pollset_worker *specific_worker); grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, grpc_millis deadline); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); @@ -96,8 +95,7 @@ static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx, static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker, - gpr_timespec now, - gpr_timespec deadline) { + grpc_millis deadline) { non_polling_poller *npp = (non_polling_poller *)pollset; if (npp->shutdown) return GRPC_ERROR_NONE; non_polling_worker w; @@ -111,7 +109,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, w.next->prev = w.prev->next = &w; } w.kicked = false; - while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline)) + gpr_timespec deadline_ts = + grpc_millis_to_timespec(exec_ctx, deadline, GPR_CLOCK_REALTIME); + while (!npp->shutdown && !w.kicked && + !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; if (&w == npp->root) { npp->root = w.next; @@ -738,7 +739,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, typedef struct { gpr_atm last_seen_things_queued_ever; grpc_completion_queue *cq; - gpr_timespec deadline; + grpc_millis deadline; grpc_cq_completion *stolen_completion; void *tag; /* for pluck */ bool first_loop; @@ -767,8 +768,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { return true; } } - return !a->first_loop && - gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); } #ifndef NDEBUG @@ -797,7 +797,6 @@ static void dump_pending_tags(grpc_completion_queue *cq) {} static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; - gpr_timespec now; cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -814,23 +813,22 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, dump_pending_tags(cq); - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cq, "next"); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cq, - .deadline = deadline, + .deadline = 0, .stolen_completion = NULL, .tag = NULL, .first_loop = true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); - + grpc_millis deadline_millis = is_finished_arg.deadline = + grpc_timespec_to_millis(&exec_ctx, deadline); for (;;) { - gpr_timespec iteration_deadline = deadline; + grpc_millis iteration_deadline = deadline_millis; if (is_finished_arg.stolen_completion != NULL) { grpc_cq_completion *c = is_finished_arg.stolen_completion; @@ -858,7 +856,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { - iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + iteration_deadline = 0; } } @@ -881,8 +879,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, break; } - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -893,7 +891,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, gpr_mu_lock(cq->mu); cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - NULL, now, iteration_deadline); + NULL, iteration_deadline); gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { @@ -1022,8 +1020,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { } gpr_mu_unlock(cq->mu); } - return !a->first_loop && - gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); } static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, @@ -1032,7 +1029,6 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; - gpr_timespec now; cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -1059,12 +1055,14 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cq, - .deadline = deadline, + .deadline = 0, .stolen_completion = NULL, .tag = tag, .first_loop = true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); + grpc_millis deadline_millis = is_finished_arg.deadline = + grpc_timespec_to_millis(&exec_ctx, deadline); for (;;) { if (is_finished_arg.stolen_completion != NULL) { gpr_mu_unlock(cq->mu); @@ -1111,8 +1109,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1120,10 +1118,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - - cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - &worker, now, deadline); + &worker, deadline_millis); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c index 54693c4900..6fbd43aa6e 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.c +++ b/test/core/end2end/fixtures/http_proxy_fixture.c @@ -365,8 +365,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, GPR_ASSERT(resolved_addresses->naddrs >= 1); // Connect to requested address. // The connection callback inherits our reference to conn. - const gpr_timespec deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(10, GPR_TIMESPAN)); + const grpc_millis deadline = + grpc_exec_ctx_now(exec_ctx) + 10 * GPR_MS_PER_SEC; grpc_tcp_client_connect(exec_ctx, &conn->on_server_connect_done, &conn->server_endpoint, conn->pollset_set, NULL, &resolved_addresses->addrs[0], deadline); @@ -422,14 +422,12 @@ static void thread_main(void* arg) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; do { gpr_ref(&proxy->users); - const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - const gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN)); grpc_pollset_worker* worker = NULL; gpr_mu_lock(proxy->mu); GRPC_LOG_IF_ERROR( "grpc_pollset_work", - grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, now, deadline)); + grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, + grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC)); gpr_mu_unlock(proxy->mu); grpc_exec_ctx_flush(&exec_ctx); } while (!gpr_unref(&proxy->users)); diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 11b45e8e08..b7cbba63c1 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -172,10 +172,11 @@ static void read_and_write_test(grpc_endpoint_test_config config, size_t num_bytes, size_t write_size, size_t slice_size, bool shutdown) { struct read_and_write_test_state state; - gpr_timespec deadline = grpc_timeout_seconds_to_deadline(20); grpc_endpoint_test_fixture f = begin_test(config, "read_and_write_test", slice_size); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_millis deadline = + grpc_timespec_to_millis(&exec_ctx, grpc_timeout_seconds_to_deadline(20)); gpr_log(GPR_DEBUG, "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR " slice_size=%" PRIuPTR " shutdown=%d", num_bytes, write_size, slice_size, shutdown); @@ -231,11 +232,10 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_mu_lock(g_mu); while (!state.read_done || !state.write_done) { grpc_pollset_worker *worker = NULL; - GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); + GPR_ASSERT(grpc_exec_ctx_now(&exec_ctx) < deadline); GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline))); + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); } gpr_mu_unlock(g_mu); grpc_exec_ctx_flush(&exec_ctx); @@ -260,14 +260,14 @@ static void wait_for_fail_count(grpc_exec_ctx *exec_ctx, int *fail_count, int want_fail_count) { grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(g_mu); - gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10); - while (gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0 && + grpc_millis deadline = + grpc_timespec_to_millis(exec_ctx, grpc_timeout_seconds_to_deadline(10)); + while (grpc_exec_ctx_now(exec_ctx) < deadline && *fail_count < want_fail_count) { grpc_pollset_worker *worker = NULL; GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(exec_ctx, g_pollset, &worker, - gpr_now(deadline.clock_type), deadline))); + grpc_pollset_work(exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(g_mu); diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index e2331fbd97..363b51ca17 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -92,8 +92,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials( "pollset_work", grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&request.pops), - &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)))) { + &worker, GRPC_MILLIS_INF_FUTURE))) { request.is_done = 1; } } diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c index d5739effb3..e3ef486251 100644 --- a/test/core/util/port_server_client.c +++ b/test/core/util/port_server_client.c @@ -87,7 +87,7 @@ void grpc_free_port_using_server(int port) { grpc_resource_quota *resource_quota = grpc_resource_quota_create("port_server_client/free"); grpc_httpcli_get(&exec_ctx, &context, &pr.pops, resource_quota, &req, - grpc_timeout_seconds_to_deadline(30), + grpc_exec_ctx_now(&exec_ctx) + 30 * GPR_MS_PER_SEC, GRPC_CLOSURE_CREATE(freed_port_from_server, &pr, grpc_schedule_on_exec_ctx), &rsp); @@ -99,8 +99,8 @@ void grpc_free_port_using_server(int port) { if (!GRPC_LOG_IF_ERROR( "pollset_work", grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), - &worker, gpr_now(GPR_CLOCK_MONOTONIC), - grpc_timeout_seconds_to_deadline(1)))) { + &worker, + grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC))) { pr.done = 1; } } @@ -171,7 +171,7 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_resource_quota *resource_quota = grpc_resource_quota_create("port_server_client/pick_retry"); grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, resource_quota, &req, - grpc_timeout_seconds_to_deadline(10), + grpc_exec_ctx_now(exec_ctx) + 30 * GPR_MS_PER_SEC, GRPC_CLOSURE_CREATE(got_port_from_server, pr, grpc_schedule_on_exec_ctx), &pr->response); @@ -221,7 +221,7 @@ int grpc_pick_port_using_server(void) { grpc_resource_quota_create("port_server_client/pick"); grpc_httpcli_get( &exec_ctx, &context, &pr.pops, resource_quota, &req, - grpc_timeout_seconds_to_deadline(30), + grpc_exec_ctx_now(&exec_ctx) + 30 * GPR_MS_PER_SEC, GRPC_CLOSURE_CREATE(got_port_from_server, &pr, grpc_schedule_on_exec_ctx), &pr.response); grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); @@ -232,8 +232,8 @@ int grpc_pick_port_using_server(void) { if (!GRPC_LOG_IF_ERROR( "pollset_work", grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), - &worker, gpr_now(GPR_CLOCK_MONOTONIC), - grpc_timeout_seconds_to_deadline(1)))) { + &worker, + grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC))) { pr.port = 0; } } diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 611868ce5a..14e53ef805 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -1464,8 +1464,11 @@ class BuildAndRunError(object): def _has_epollexclusive(): + binary = 'bins/%s/check_epollexclusive' % args.config + if not os.path.exists(binary): + return False try: - subprocess.check_call('bins/%s/check_epollexclusive' % args.config) + subprocess.check_call(binary) return True except subprocess.CalledProcessError, e: return False |