diff options
Diffstat (limited to 'src/core/lib')
135 files changed, 693 insertions, 659 deletions
diff --git a/src/core/lib/support/backoff.cc b/src/core/lib/backoff/backoff.cc index 6dc0df473b..fe0a751817 100644 --- a/src/core/lib/support/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -16,13 +16,14 @@ * */ -#include "src/core/lib/support/backoff.h" +#include "src/core/lib/backoff/backoff.h" #include <grpc/support/useful.h> -void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout, - double multiplier, double jitter, - int64_t min_timeout_millis, int64_t max_timeout_millis) { +void grpc_backoff_init(grpc_backoff *backoff, + grpc_millis initial_connect_timeout, double multiplier, + double jitter, grpc_millis min_timeout_millis, + grpc_millis max_timeout_millis) { backoff->initial_connect_timeout = initial_connect_timeout; backoff->multiplier = multiplier; backoff->jitter = jitter; @@ -31,11 +32,11 @@ void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout, backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; } -gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now) { +grpc_millis grpc_backoff_begin(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff) { backoff->current_timeout_millis = backoff->initial_connect_timeout; - const int64_t first_timeout = + const grpc_millis first_timeout = GPR_MAX(backoff->current_timeout_millis, backoff->min_timeout_millis); - return gpr_time_add(now, gpr_time_from_millis(first_timeout, GPR_TIMESPAN)); + return grpc_exec_ctx_now(exec_ctx) + first_timeout; } /* Generate a random number between 0 and 1. */ @@ -44,11 +45,11 @@ static double generate_uniform_random_number(uint32_t *rng_state) { return *rng_state / (double)((uint32_t)1 << 31); } -gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) { +grpc_millis grpc_backoff_step(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff) { const double new_timeout_millis = backoff->multiplier * (double)backoff->current_timeout_millis; backoff->current_timeout_millis = - GPR_MIN((int64_t)new_timeout_millis, backoff->max_timeout_millis); + GPR_MIN((grpc_millis)new_timeout_millis, backoff->max_timeout_millis); const double jitter_range_width = backoff->jitter * new_timeout_millis; const double jitter = @@ -56,17 +57,17 @@ gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) { jitter_range_width; backoff->current_timeout_millis = - (int64_t)((double)(backoff->current_timeout_millis) + jitter); + (grpc_millis)((double)(backoff->current_timeout_millis) + jitter); - const gpr_timespec current_deadline = gpr_time_add( - now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); + const grpc_millis current_deadline = + grpc_exec_ctx_now(exec_ctx) + backoff->current_timeout_millis; - const gpr_timespec min_deadline = gpr_time_add( - now, gpr_time_from_millis(backoff->min_timeout_millis, GPR_TIMESPAN)); + const grpc_millis min_deadline = + grpc_exec_ctx_now(exec_ctx) + backoff->min_timeout_millis; - return gpr_time_max(current_deadline, min_deadline); + return GPR_MAX(current_deadline, min_deadline); } -void gpr_backoff_reset(gpr_backoff *backoff) { +void grpc_backoff_reset(grpc_backoff *backoff) { backoff->current_timeout_millis = backoff->initial_connect_timeout; } diff --git a/src/core/lib/support/backoff.h b/src/core/lib/backoff/backoff.h index 31ec28f666..80e49ea52a 100644 --- a/src/core/lib/support/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -16,10 +16,10 @@ * */ -#ifndef GRPC_CORE_LIB_SUPPORT_BACKOFF_H -#define GRPC_CORE_LIB_SUPPORT_BACKOFF_H +#ifndef GRPC_CORE_LIB_BACKOFF_BACKOFF_H +#define GRPC_CORE_LIB_BACKOFF_BACKOFF_H -#include <grpc/support/time.h> +#include "src/core/lib/iomgr/exec_ctx.h" #ifdef __cplusplus extern "C" { @@ -27,38 +27,40 @@ extern "C" { typedef struct { /// const: how long to wait after the first failure before retrying - int64_t initial_connect_timeout; + grpc_millis initial_connect_timeout; /// const: factor with which to multiply backoff after a failed retry double multiplier; /// const: amount to randomize backoffs double jitter; /// const: minimum time between retries in milliseconds - int64_t min_timeout_millis; + grpc_millis min_timeout_millis; /// const: maximum time between retries in milliseconds - int64_t max_timeout_millis; + grpc_millis max_timeout_millis; /// random number generator uint32_t rng_state; /// current retry timeout in milliseconds - int64_t current_timeout_millis; -} gpr_backoff; + grpc_millis current_timeout_millis; +} grpc_backoff; /// Initialize backoff machinery - does not need to be destroyed -void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout, - double multiplier, double jitter, - int64_t min_timeout_millis, int64_t max_timeout_millis); +void grpc_backoff_init(grpc_backoff *backoff, + grpc_millis initial_connect_timeout, double multiplier, + double jitter, grpc_millis min_timeout_millis, + grpc_millis max_timeout_millis); /// Begin retry loop: returns a timespec for the NEXT retry -gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now); +grpc_millis grpc_backoff_begin(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff); /// Step a retry loop: returns a timespec for the NEXT retry -gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now); -/// Reset the backoff, so the next gpr_backoff_step will be a gpr_backoff_begin +grpc_millis grpc_backoff_step(grpc_exec_ctx *exec_ctx, grpc_backoff *backoff); +/// Reset the backoff, so the next grpc_backoff_step will be a +/// grpc_backoff_begin /// instead -void gpr_backoff_reset(gpr_backoff *backoff); +void grpc_backoff_reset(grpc_backoff *backoff); #ifdef __cplusplus } #endif -#endif /* GRPC_CORE_LIB_SUPPORT_BACKOFF_H */ +#endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */ diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 2837174f49..1896d35cf4 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -157,4 +157,4 @@ grpc_arg grpc_channel_arg_pointer_create(char *name, void *value, } #endif -#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index f0de80f0c0..5c00c09889 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -70,7 +70,7 @@ typedef struct { grpc_call_context_element *context; grpc_slice path; gpr_timespec start_time; - gpr_timespec deadline; + grpc_millis deadline; gpr_arena *arena; grpc_call_combiner *call_combiner; } grpc_call_element_args; diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h index b55a1089a0..4615727baa 100644 --- a/src/core/lib/channel/connected_channel.h +++ b/src/core/lib/channel/connected_channel.h @@ -38,4 +38,4 @@ grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem); } #endif -#endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc index 1753da5721..b27ee37e5b 100644 --- a/src/core/lib/channel/handshaker.cc +++ b/src/core/lib/channel/handshaker.cc @@ -232,7 +232,7 @@ static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* channel_args, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, void* user_data) { gpr_mu_lock(&mgr->mu); GPR_ASSERT(mgr->index == 0); @@ -255,9 +255,7 @@ void grpc_handshake_manager_do_handshake( gpr_ref(&mgr->refs); GRPC_CLOSURE_INIT(&mgr->on_timeout, on_timeout, mgr, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &mgr->deadline_timer, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &mgr->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &mgr->deadline_timer, deadline, &mgr->on_timeout); // Start first handshaker, which also owns a ref. gpr_ref(&mgr->refs); bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE); diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index a857cde791..8ed38c15ba 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -149,7 +149,7 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* channel_args, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, void* user_data); /// Add \a mgr to the server side list of all pending handshake managers, the @@ -172,4 +172,4 @@ void grpc_handshake_manager_pending_list_shutdown_all( } #endif -#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */ diff --git a/src/core/lib/channel/handshaker_factory.h b/src/core/lib/channel/handshaker_factory.h index 2a130de252..59008adf05 100644 --- a/src/core/lib/channel/handshaker_factory.h +++ b/src/core/lib/channel/handshaker_factory.h @@ -56,4 +56,4 @@ void grpc_handshaker_factory_destroy( } #endif -#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H */ diff --git a/src/core/lib/channel/handshaker_registry.h b/src/core/lib/channel/handshaker_registry.h index e96bf06b6a..ddd280bea8 100644 --- a/src/core/lib/channel/handshaker_registry.h +++ b/src/core/lib/channel/handshaker_registry.h @@ -53,4 +53,4 @@ void grpc_handshakers_add(grpc_exec_ctx* exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_REGISTRY_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_REGISTRY_H */ diff --git a/src/core/lib/compression/algorithm_metadata.h b/src/core/lib/compression/algorithm_metadata.h index 3eb7088230..17caf58f69 100644 --- a/src/core/lib/compression/algorithm_metadata.h +++ b/src/core/lib/compression/algorithm_metadata.h @@ -57,4 +57,4 @@ grpc_stream_compression_algorithm grpc_stream_compression_algorithm_from_slice( } #endif -#endif /* GRPC_CORE_LIB_COMPRESSION_ALGORITHM_METADATA_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_COMPRESSION_ALGORITHM_METADATA_H */ diff --git a/src/core/lib/compression/message_compress.h b/src/core/lib/compression/message_compress.h index d2545a02c2..fffe175fd2 100644 --- a/src/core/lib/compression/message_compress.h +++ b/src/core/lib/compression/message_compress.h @@ -44,4 +44,4 @@ int grpc_msg_decompress(grpc_exec_ctx* exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_COMPRESSION_MESSAGE_COMPRESS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_COMPRESSION_MESSAGE_COMPRESS_H */ diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc index c0aec63c1d..5bd7884e28 100644 --- a/src/core/lib/debug/stats_data.cc +++ b/src/core/lib/debug/stats_data.cc @@ -77,6 +77,7 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "http2_initiate_write_due_to_transport_flow_control_unstalled", "http2_initiate_write_due_to_ping_response", "http2_initiate_write_due_to_force_rst_stream", + "http2_spurious_writes_begun", "hpack_recv_indexed", "hpack_recv_lithdr_incidx", "hpack_recv_lithdr_incidx_v", @@ -177,6 +178,7 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "'transport_flow_control_unstalled'", "Number of HTTP2 writes initiated due to 'ping_response'", "Number of HTTP2 writes initiated due to 'force_rst_stream'", + "Number of HTTP2 writes initiated with nothing to write", "Number of HPACK indexed fields received", "Number of HPACK literal headers received with incremental indexing", "Number of HPACK literal headers received with incremental indexing and " diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index cf5bafbd04..d8e4e7d264 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -83,6 +83,7 @@ typedef enum { GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED, GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE, GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM, + GRPC_STATS_COUNTER_HTTP2_SPURIOUS_WRITES_BEGUN, GRPC_STATS_COUNTER_HPACK_RECV_INDEXED, GRPC_STATS_COUNTER_HPACK_RECV_LITHDR_INCIDX, GRPC_STATS_COUNTER_HPACK_RECV_LITHDR_INCIDX_V, @@ -330,6 +331,9 @@ typedef enum { GRPC_STATS_INC_COUNTER( \ (exec_ctx), \ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM) +#define GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_SPURIOUS_WRITES_BEGUN) #define GRPC_STATS_INC_HPACK_RECV_INDEXED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HPACK_RECV_INDEXED) #define GRPC_STATS_INC_HPACK_RECV_LITHDR_INCIDX(exec_ctx) \ diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index b5c15ff55c..5c0ab2262e 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -189,6 +189,8 @@ doc: Number of HTTP2 writes initiated due to 'ping_response' - counter: http2_initiate_write_due_to_force_rst_stream doc: Number of HTTP2 writes initiated due to 'force_rst_stream' +- counter: http2_spurious_writes_begun + doc: Number of HTTP2 writes initiated with nothing to write - counter: hpack_recv_indexed doc: Number of HPACK indexed fields received - counter: hpack_recv_lithdr_incidx @@ -270,3 +272,4 @@ - counter: server_slowpath_requests_queued doc: How many times was the server slow path taken (indicates too few outstanding requests) + diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index f96e40c00e..54869977b0 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -52,6 +52,7 @@ http2_initiate_write_due_to_keepalive_ping_per_iteration:FLOAT, http2_initiate_write_due_to_transport_flow_control_unstalled_per_iteration:FLOAT, http2_initiate_write_due_to_ping_response_per_iteration:FLOAT, http2_initiate_write_due_to_force_rst_stream_per_iteration:FLOAT, +http2_spurious_writes_begun_per_iteration:FLOAT, hpack_recv_indexed_per_iteration:FLOAT, hpack_recv_lithdr_incidx_per_iteration:FLOAT, hpack_recv_lithdr_incidx_v_per_iteration:FLOAT, diff --git a/src/core/lib/http/format_request.h b/src/core/lib/http/format_request.h index a559aac660..2e77e8661a 100644 --- a/src/core/lib/http/format_request.h +++ b/src/core/lib/http/format_request.h @@ -37,4 +37,4 @@ grpc_slice grpc_httpcli_format_connect_request( } #endif -#endif /* GRPC_CORE_LIB_HTTP_FORMAT_REQUEST_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_HTTP_FORMAT_REQUEST_H */ diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index db995943a9..c96800b85c 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -44,7 +44,7 @@ typedef struct { grpc_endpoint *ep; char *host; char *ssl_host_override; - gpr_timespec deadline; + grpc_millis deadline; int have_read_byte; const grpc_httpcli_handshaker *handshaker; grpc_closure *on_done; @@ -65,7 +65,7 @@ static grpc_httpcli_post_override g_post_override = NULL; static void plaintext_handshake(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint, const char *host, - gpr_timespec deadline, + grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)) { @@ -240,7 +240,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_done, + grpc_millis deadline, grpc_closure *on_done, grpc_httpcli_response *response, const char *name, grpc_slice request_text) { internal_request *req = @@ -278,9 +278,8 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, - const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_done, - grpc_httpcli_response *response) { + const grpc_httpcli_request *request, grpc_millis deadline, + grpc_closure *on_done, grpc_httpcli_response *response) { char *name; if (g_get_override && g_get_override(exec_ctx, request, deadline, on_done, response)) { @@ -298,7 +297,7 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, - gpr_timespec deadline, grpc_closure *on_done, + grpc_millis deadline, grpc_closure *on_done, grpc_httpcli_response *response) { char *name; if (g_post_override && diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 630481da54..76b790fa8a 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -46,7 +46,7 @@ typedef struct grpc_httpcli_context { typedef struct { const char *default_port; void (*handshake)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint, - const char *host, gpr_timespec deadline, + const char *host, grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)); } grpc_httpcli_handshaker; @@ -87,8 +87,8 @@ void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx, void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, grpc_resource_quota *resource_quota, - const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_closure *on_complete, + const grpc_httpcli_request *request, grpc_millis deadline, + grpc_closure *on_complete, grpc_httpcli_response *response); /* Asynchronously perform a HTTP POST. @@ -110,18 +110,18 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, - gpr_timespec deadline, grpc_closure *on_complete, + grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); /* override functions return 1 if they handled the request, 0 otherwise */ typedef int (*grpc_httpcli_get_override)(grpc_exec_ctx *exec_ctx, const grpc_httpcli_request *request, - gpr_timespec deadline, + grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); typedef int (*grpc_httpcli_post_override)( grpc_exec_ctx *exec_ctx, const grpc_httpcli_request *request, - const char *body_bytes, size_t body_size, gpr_timespec deadline, + const char *body_bytes, size_t body_size, grpc_millis deadline, grpc_closure *on_complete, grpc_httpcli_response *response); void grpc_httpcli_set_override(grpc_httpcli_get_override get, @@ -131,4 +131,4 @@ void grpc_httpcli_set_override(grpc_httpcli_get_override get, } #endif -#endif /* GRPC_CORE_LIB_HTTP_HTTPCLI_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_HTTP_HTTPCLI_H */ diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index 8a0f225ba2..ef6c4a509b 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -157,7 +157,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, const char *host, - gpr_timespec deadline, + grpc_millis deadline, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)) { on_done_closure *c = (on_done_closure *)gpr_malloc(sizeof(*c)); diff --git a/src/core/lib/http/parser.h b/src/core/lib/http/parser.h index 5484948bea..d2bda6ae0e 100644 --- a/src/core/lib/http/parser.h +++ b/src/core/lib/http/parser.h @@ -117,4 +117,4 @@ extern grpc_tracer_flag grpc_http1_trace; } #endif -#endif /* GRPC_CORE_LIB_HTTP_PARSER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_HTTP_PARSER_H */ diff --git a/src/core/lib/support/block_annotate.h b/src/core/lib/iomgr/block_annotate.h index 8e3ef7df65..fcbfe9eb1a 100644 --- a/src/core/lib/support/block_annotate.h +++ b/src/core/lib/iomgr/block_annotate.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H -#define GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H +#ifndef GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H +#define GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H #ifdef __cplusplus extern "C" { @@ -39,17 +39,26 @@ void gpr_thd_end_blocking_region(); do { \ gpr_thd_start_blocking_region(); \ } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION \ - do { \ - gpr_thd_end_blocking_region(); \ +#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ + do { \ + gpr_thd_end_blocking_region(); \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(ec) \ + do { \ + gpr_thd_end_blocking_region(); \ + grpc_exec_ctx_invalidate_now((ec)); \ } while (0) #else #define GRPC_SCHEDULING_START_BLOCKING_REGION \ do { \ } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION \ - do { \ +#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ + do { \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(ec) \ + do { \ + grpc_exec_ctx_invalidate_now((ec)); \ } while (0) #endif -#endif /* GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H */ +#endif /* GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H */ diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 16ff0ab733..21347d9023 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -103,4 +103,4 @@ struct grpc_endpoint { } #endif -#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_H */ diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index f8830022f4..ee91795749 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -37,4 +37,4 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h index f718e06d4e..8746d5d353 100644 --- a/src/core/lib/iomgr/error_internal.h +++ b/src/core/lib/iomgr/error_internal.h @@ -65,4 +65,4 @@ bool grpc_error_is_special(grpc_error *err); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 689aac15bf..6126e2771c 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -24,6 +24,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <string.h> @@ -39,12 +40,12 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; @@ -561,25 +562,17 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_TIMER_END("pollset_shutdown", 0); } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, now) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) { + return INT_MAX; + } else if (delta < 0) { return 0; + } else { + return (int)delta; } - - static const gpr_timespec round_up = { - 0, /* tv_sec */ - GPR_NS_PER_MS - 1, /* tv_nsec */ - GPR_TIMESPAN /* clock_type */ - }; - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); - return millis >= 1 ? millis : 1; } /* Process the epoll events found by do_epoll_wait() function. @@ -636,11 +629,11 @@ static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx, (i.e the designated poller thread) will be calling this function. So there is no need for any synchronization when accesing fields in g_epoll_set */ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { GPR_TIMER_BEGIN("do_epoll_wait", 0); int r; - int timeout = poll_deadline_to_millis_timeout(deadline, now); + int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (timeout != 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; } @@ -650,7 +643,7 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -668,9 +661,10 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, return GRPC_ERROR_NONE; } -static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, gpr_timespec *now, - gpr_timespec deadline) { +static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + grpc_millis deadline) { GPR_TIMER_BEGIN("begin_worker", 0); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; @@ -755,14 +749,15 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->shutting_down); } - if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && + if (gpr_cv_wait(&worker->cv, &pollset->mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) && worker->state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ SET_KICK_STATE(worker, KICKED); } } - *now = gpr_now(now->clock_type); + grpc_exec_ctx_invalidate_now(exec_ctx); } if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -941,7 +936,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; @@ -952,7 +947,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, return GRPC_ERROR_NONE; } - if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) { + if (begin_worker(exec_ctx, ps, &worker, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!ps->shutting_down); @@ -975,8 +970,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, designated poller */ if (gpr_atm_acq_load(&g_epoll_set.cursor) == gpr_atm_acq_load(&g_epoll_set.num_events)) { - append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline), - err_desc); + append_error(&error, do_epoll_wait(exec_ctx, ps, deadline), err_desc); } append_error(&error, process_epoll_events(exec_ctx, ps), err_desc); diff --git a/src/core/lib/iomgr/ev_epoll1_linux.h b/src/core/lib/iomgr/ev_epoll1_linux.h index 66fd826b49..b437032b36 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.h +++ b/src/core/lib/iomgr/ev_epoll1_linux.h @@ -34,4 +34,4 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 93f9d2feff..a44cdb8597 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -25,6 +25,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <string.h> @@ -38,7 +39,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/is_epollexclusive_available.h" #include "src/core/lib/iomgr/lockfree_event.h" @@ -46,7 +47,6 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/spinlock.h" // debug aid: create workers on the heap (allows asan to spot @@ -651,32 +651,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &pollset->mu; } -/* Convert a timespec to milliseconds: - - Very small or negative poll times are clamped to zero to do a non-blocking - poll (which becomes spin polling) - - Other small values are rounded up to one millisecond - - Longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - Infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, now) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) + return INT_MAX; + else if (delta < 0) return 0; - } - - static const gpr_timespec round_up = { - 0, /* tv_sec */ - GPR_NS_PER_MS - 1, /* tv_nsec */ - GPR_TIMESPAN /* clock_type */ - }; - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); - return millis >= 1 ? millis : 1; + else + return (int)delta; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -781,9 +765,8 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable *p, gpr_timespec now, - gpr_timespec deadline) { - int timeout = poll_deadline_to_millis_timeout(deadline, now); + pollable *p, grpc_millis deadline) { + int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { char *desc = pollable_desc(p); @@ -800,7 +783,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -855,9 +838,10 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root_worker, } /* Return true if this thread should poll */ -static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, gpr_timespec *now, - gpr_timespec deadline) { +static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + grpc_millis deadline) { bool do_poll = true; if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; @@ -875,10 +859,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, worker->pollable_obj, worker, - poll_deadline_to_millis_timeout(deadline, *now)); + poll_deadline_to_millis_timeout(exec_ctx, deadline)); } while (do_poll && worker->pollable_obj->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, deadline)) { + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, worker->pollable_obj, worker); @@ -896,7 +881,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->pollable_obj, worker); } } - *now = gpr_now(now->clock_type); + grpc_exec_ctx_invalidate_now(exec_ctx); } gpr_mu_unlock(&worker->pollable_obj->mu); @@ -932,7 +917,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { #ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP grpc_pollset_worker *worker = (grpc_pollset_worker *)gpr_malloc(sizeof(*worker)); @@ -942,10 +927,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, #define WORKER_PTR (&worker) #endif if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 - ".%09d deadline=%" PRId64 ".%09d kwp=%d pollable=%p", - pollset, worker_hdl, WORKER_PTR, now.tv_sec, now.tv_nsec, - deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller, + gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR " kwp=%d pollable=%p", + pollset, worker_hdl, WORKER_PTR,grpc_exec_ctx_now(exec_ctx), + deadline, pollset->kicked_without_poller, pollset->active_pollable); } static const char *err_desc = "pollset_work"; @@ -953,7 +937,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->kicked_without_poller) { pollset->kicked_without_poller = false; } else { - if (begin_worker(pollset, WORKER_PTR, worker_hdl, &now, deadline)) { + if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); GPR_ASSERT(!pollset->shutdown_closure); @@ -961,7 +945,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->event_cursor == pollset->event_count) { append_error(&error, pollset_epoll(exec_ctx, pollset, WORKER_PTR->pollable_obj, - now, deadline), + deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), diff --git a/src/core/lib/iomgr/ev_epollex_linux.h b/src/core/lib/iomgr/ev_epollex_linux.h index 58cc5a24f8..2849a23283 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.h +++ b/src/core/lib/iomgr/ev_epollex_linux.h @@ -33,4 +33,4 @@ const grpc_event_engine_vtable *grpc_init_epollex_linux( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLEX_LINUX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLEX_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index c8e07c6e18..035bdc4cb5 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -18,6 +18,8 @@ #include "src/core/lib/iomgr/port.h" +#include <grpc/grpc_posix.h> + /* This polling engine is only relevant on linux kernels supporting epoll() */ #ifdef GRPC_LINUX_EPOLL @@ -25,6 +27,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <signal.h> @@ -40,13 +43,13 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -1089,30 +1092,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->shutdown_done = NULL; } -/* Convert a timespec to milliseconds: - - Very small or negative poll times are clamped to zero to do a non-blocking - poll (which becomes spin polling) - - Other small values are rounded up to one millisecond - - Longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - Infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) + return INT_MAX; + else if (delta < 0) return 0; - } - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); - return millis >= 1 ? millis : 1; + else + return (int)delta; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -1243,7 +1232,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); if (ep_rv < 0) { if (errno != EINTR) { gpr_asprintf(&err_msg, @@ -1310,10 +1299,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { GPR_TIMER_BEGIN("pollset_work", 0); grpc_error *error = GRPC_ERROR_NONE; - int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); + int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, deadline); sigset_t new_mask; diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index e170702dca..036a35690c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -24,6 +24,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <string.h> #include <sys/socket.h> @@ -37,12 +38,11 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/murmur_hash.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -50,7 +50,6 @@ /******************************************************************************* * FD declarations */ - typedef struct grpc_fd_watcher { struct grpc_fd_watcher *next; struct grpc_fd_watcher *prev; @@ -200,8 +199,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline); /* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 @@ -876,7 +875,7 @@ static void work_combine_error(grpc_error **composite, grpc_error *error) { static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; @@ -945,7 +944,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd_watcher *watchers; struct pollfd *pfds; - timeout = poll_deadline_to_millis_timeout(deadline, now); + timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (pollset->fd_count + 2 <= inline_elements) { pfds = pollfd_space; @@ -991,7 +990,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GRPC_SCHEDULING_START_BLOCKING_REGION; GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); @@ -1068,13 +1067,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + deadline = 0; } keep_polling = 1; } - if (keep_polling) { - now = gpr_now(now.clock_type); - } } gpr_tls_set(&g_current_thread_poller, 0); if (added_worker) { @@ -1126,21 +1122,14 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; + if (deadline == 0) return 0; + grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx); + if (n < 0) return 0; + if (n > INT_MAX) return -1; + return (int)n; } /******************************************************************************* diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h index 84b68155b5..861257204b 100644 --- a/src/core/lib/iomgr/ev_poll_posix.h +++ b/src/core/lib/iomgr/ev_poll_posix.h @@ -32,4 +32,4 @@ const grpc_event_engine_vtable *grpc_init_poll_cv_posix(bool explicit_request); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 80269881cb..5656ebb340 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -61,10 +61,28 @@ typedef struct { event_engine_factory_fn factory; } event_engine_factory; +namespace { +extern "C" { +int dummypoll(struct pollfd fds[], nfds_t nfds, int timeout) { + gpr_log(GPR_ERROR, "Attempted to poll despite declaring non-polling."); + GPR_ASSERT(false); + return -1; +} +} // extern "C" + +const grpc_event_engine_vtable *init_non_polling(bool explicit_request) { + // return the simplest engine as a dummy but also override the poller + auto ret = grpc_init_poll_posix(explicit_request); + grpc_poll_function = dummypoll; + return ret; +} +} // namespace + static const event_engine_factory g_factories[] = { {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux}, {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix}, {"poll-cv", grpc_init_poll_cv_posix}, + {"none", init_non_polling}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { @@ -203,9 +221,9 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); + grpc_pollset_worker **worker, + grpc_millis deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, deadline); } grpc_error *grpc_pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 5ad1c13ee6..bc4456c2a2 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -56,8 +56,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, + grpc_millis deadline); grpc_error *(*pollset_kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -166,4 +166,4 @@ const grpc_event_engine_vtable *grpc_get_event_engine_test_only(); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 41c69add17..3d17afcb8f 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -104,9 +104,69 @@ static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -void grpc_exec_ctx_global_init(void) {} +static gpr_timespec + g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the + // last enum value in + // gpr_clock_type + +void grpc_exec_ctx_global_init(void) { + for (int i = 0; i < GPR_TIMESPAN; i++) { + g_start_time[i] = gpr_now((gpr_clock_type)i); + } + // allows uniform treatment in conversion functions + g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN); +} + void grpc_exec_ctx_global_shutdown(void) {} +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); + double x = GPR_MS_PER_SEC * (double)ts.tv_sec + + (double)ts.tv_nsec / GPR_NS_PER_MS + + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx) { + if (!exec_ctx->now_is_valid) { + exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + exec_ctx->now_is_valid = true; + } + return exec_ctx->now; +} + +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) { + exec_ctx->now_is_valid = false; +} + +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, + gpr_clock_type clock_type) { + if (clock_type == GPR_TIMESPAN) { + return gpr_time_from_millis(millis, GPR_TIMESPAN); + } + return gpr_time_add(g_start_time[clock_type], + gpr_time_from_millis(millis, GPR_TIMESPAN)); +} + +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) { + return timespec_to_atm_round_down(ts); +} + +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { + return timespec_to_atm_round_up(ts); +} + static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { exec_ctx_run, exec_ctx_sched, "exec_ctx"}; static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index a93728f0a6..44b9be7aa9 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -19,14 +19,19 @@ #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H +#include <grpc/support/atm.h> #include <grpc/support/cpu.h> + #include "src/core/lib/iomgr/closure.h" #ifdef __cplusplus extern "C" { #endif -/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ +typedef gpr_atm grpc_millis; + +#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX +#define GRPC_MILLIS_INF_PAST GPR_ATM_MIN /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ @@ -70,6 +75,9 @@ struct grpc_exec_ctx { unsigned starting_cpu; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); + + bool now_is_valid; + grpc_millis now; }; /* initializer for grpc_exec_ctx: @@ -77,7 +85,7 @@ struct grpc_exec_ctx { #define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ { \ GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, gpr_cpu_current_cpu(), \ - finish_check_arg, finish_check \ + finish_check_arg, finish_check, false, 0 \ } /* initialize an execution context at the top level of an API call into grpc @@ -110,6 +118,12 @@ void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_shutdown(void); +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx); +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx); +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); + #ifdef __cplusplus } #endif diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index ebe7f240b4..92c3e70301 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -178,6 +178,7 @@ static void executor_thread(void *arg) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } + grpc_exec_ctx_invalidate_now(&exec_ctx); subtract_depth = run_closures(&exec_ctx, exec); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index ab3fc901de..ef5ac56c83 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -53,4 +53,4 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc index c082179c0b..78185cc084 100644 --- a/src/core/lib/iomgr/iocp_windows.cc +++ b/src/core/lib/iomgr/iocp_windows.cc @@ -21,11 +21,13 @@ #ifdef GRPC_WINSOCK_SOCKET #include <winsock2.h> +#include <limits> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/log_windows.h> #include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/iocp_windows.h" @@ -40,25 +42,20 @@ static gpr_atm g_custom_events = 0; static HANDLE g_iocp; -static DWORD deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { +static DWORD deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) { return INFINITE; } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return (DWORD)gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline < now) return 0; + grpc_millis timeout = deadline - now; + if (timeout > std::numeric_limits<DWORD>::max()) return INFINITE; + return static_cast<DWORD>(deadline - now); } grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, - gpr_timespec deadline) { + grpc_millis deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -67,9 +64,10 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket; grpc_winsocket_callback_info *info; GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); - success = GetQueuedCompletionStatus( - g_iocp, &bytes, &completion_key, &overlapped, - deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); + success = + GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped, + deadline_to_millis_timeout(exec_ctx, deadline)); + grpc_exec_ctx_invalidate_now(exec_ctx); if (success == 0 && overlapped == NULL) { return GRPC_IOCP_WORK_TIMEOUT; } @@ -121,7 +119,7 @@ void grpc_iocp_flush(void) { grpc_iocp_work_status work_status; do { - work_status = grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); + work_status = grpc_iocp_work(&exec_ctx, GRPC_MILLIS_INF_PAST); } while (work_status == GRPC_IOCP_WORK_KICK || grpc_exec_ctx_flush(&exec_ctx)); } @@ -129,7 +127,7 @@ void grpc_iocp_flush(void) { void grpc_iocp_shutdown(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (gpr_atm_acq_load(&g_custom_events)) { - grpc_iocp_work(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_iocp_work(&exec_ctx, GRPC_MILLIS_INF_FUTURE); grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/iocp_windows.h b/src/core/lib/iomgr/iocp_windows.h index 341c159501..4efbc94645 100644 --- a/src/core/lib/iomgr/iocp_windows.h +++ b/src/core/lib/iomgr/iocp_windows.h @@ -34,7 +34,7 @@ typedef enum { } grpc_iocp_work_status; grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, - gpr_timespec deadline); + grpc_millis deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_flush(void); @@ -45,4 +45,4 @@ void grpc_iocp_add_socket(grpc_winsocket *); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */ diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 3a0605833a..d6a5b4a76c 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -51,7 +51,7 @@ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); grpc_executor_init(exec_ctx); - grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_list_init(exec_ctx); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = (char *)"root"; grpc_network_status_init(); @@ -98,8 +98,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == - GRPC_TIMERS_FIRED) { + exec_ctx->now_is_valid = true; + exec_ctx->now = GRPC_MILLIS_INF_FUTURE; + if (grpc_timer_check(exec_ctx, NULL) == GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index fea08496fe..6c0a08b918 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -40,4 +40,4 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */ diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index 005abbed13..52db37c89a 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -48,4 +48,4 @@ bool grpc_iomgr_abort_on_leaks(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/is_epollexclusive_available.h b/src/core/lib/iomgr/is_epollexclusive_available.h index 5c3e483065..9ae9c5c191 100644 --- a/src/core/lib/iomgr/is_epollexclusive_available.h +++ b/src/core/lib/iomgr/is_epollexclusive_available.h @@ -31,4 +31,4 @@ bool grpc_is_epollexclusive_available(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_IS_EPOLLEXCLUSIVE_AVAILABLE_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_IS_EPOLLEXCLUSIVE_AVAILABLE_H */ diff --git a/src/core/lib/iomgr/load_file.cc b/src/core/lib/iomgr/load_file.cc index 0b4d41ea4b..5cb4099ea4 100644 --- a/src/core/lib/iomgr/load_file.cc +++ b/src/core/lib/iomgr/load_file.cc @@ -25,7 +25,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/support/string.h" grpc_error *grpc_load_file(const char *filename, int add_null_terminator, @@ -73,6 +73,6 @@ end: GRPC_ERROR_UNREF(error); error = error_out; } - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; return error; } diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h index 925f004945..02229e569e 100644 --- a/src/core/lib/iomgr/lockfree_event.h +++ b/src/core/lib/iomgr/lockfree_event.h @@ -45,4 +45,4 @@ void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */ diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h index af50d51257..cba38d4530 100644 --- a/src/core/lib/iomgr/network_status_tracker.h +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -35,4 +35,4 @@ void grpc_network_status_shutdown_all_endpoints(); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h index 4a37acf212..009f968fac 100644 --- a/src/core/lib/iomgr/polling_entity.h +++ b/src/core/lib/iomgr/polling_entity.h @@ -72,4 +72,4 @@ void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */ diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index 28d63949ea..799fae154c 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -75,8 +75,8 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); pollset lock */ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) GRPC_MUST_USE_RESULT; + grpc_pollset_worker **worker, + grpc_millis deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is non-NULL, then kick that worker. */ diff --git a/src/core/lib/iomgr/pollset_set.h b/src/core/lib/iomgr/pollset_set.h index 17df86542d..5455eda02f 100644 --- a/src/core/lib/iomgr/pollset_set.h +++ b/src/core/lib/iomgr/pollset_set.h @@ -52,4 +52,4 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_H */ diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc index 7ea5019ad5..b9901bf8ef 100644 --- a/src/core/lib/iomgr/pollset_uv.cc +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -116,13 +116,14 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { uint64_t timeout; GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { - if (gpr_time_cmp(deadline, now) >= 0) { - timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline >= now) { + timeout = deadline - now; } else { timeout = 0; } diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h index d8f72ff867..5cc9faf4ff 100644 --- a/src/core/lib/iomgr/pollset_uv.h +++ b/src/core/lib/iomgr/pollset_uv.h @@ -32,4 +32,4 @@ void grpc_pollset_global_shutdown(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */ diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc index eb295d3eeb..bb4df83fc1 100644 --- a/src/core/lib/iomgr/pollset_windows.cc +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -110,7 +110,7 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {} grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; @@ -159,7 +159,8 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, &worker); added_worker = 1; while (!worker.kicked) { - if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, deadline)) { + if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { break; } } diff --git a/src/core/lib/iomgr/pollset_windows.h b/src/core/lib/iomgr/pollset_windows.h index 7733d26471..2479b25286 100644 --- a/src/core/lib/iomgr/pollset_windows.h +++ b/src/core/lib/iomgr/pollset_windows.h @@ -68,4 +68,4 @@ void grpc_pollset_global_shutdown(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index 4a6df2cf26..5f0634299e 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -60,4 +60,4 @@ extern grpc_error *(*grpc_blocking_resolve_address)( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */ diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 60cfeebd47..1b783495df 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -33,10 +33,10 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" static grpc_error *blocking_resolve_address_impl( @@ -81,7 +81,7 @@ static grpc_error *blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (s != 0) { /* Retry if well-known service name is recognized */ @@ -90,7 +90,7 @@ static grpc_error *blocking_resolve_address_impl( if (strcmp(port, svc[i][0]) == 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, svc[i][1], &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; break; } } diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index abcfc2114d..451f01a701 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -34,10 +34,10 @@ #include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" typedef struct { @@ -87,7 +87,7 @@ static grpc_error *blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (s != 0) { error = GRPC_WSA_ERROR(WSAGetLastError(), "getaddrinfo"); goto done; diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 60262435b3..ecb5747da8 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -89,6 +89,8 @@ struct grpc_resource_user { grpc_closure_list on_allocated; /* True if we are currently trying to allocate from the quota, false if not */ bool allocating; + /* How many bytes of allocations are outstanding */ + int64_t outstanding_allocations; /* True if we are currently trying to add ourselves to the non-free quota list, false otherwise */ bool added_to_free_pool; @@ -153,6 +155,9 @@ struct grpc_resource_quota { char *name; }; +static void ru_unref_by(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, gpr_atm amount); + /******************************************************************************* * list management */ @@ -289,6 +294,25 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) { gpr_mu_lock(&resource_user->mu); + if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + gpr_log(GPR_DEBUG, "RQ: check allocation for user %p shutdown=%" PRIdPTR + " free_pool=%" PRId64, + resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown), + resource_user->free_pool); + } + if (gpr_atm_no_barrier_load(&resource_user->shutdown)) { + resource_user->allocating = false; + grpc_closure_list_fail_all( + &resource_user->on_allocated, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown")); + int64_t aborted_allocations = resource_user->outstanding_allocations; + resource_user->outstanding_allocations = 0; + resource_user->free_pool += aborted_allocations; + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); + gpr_mu_unlock(&resource_user->mu); + ru_unref_by(exec_ctx, resource_user, (gpr_atm)aborted_allocations); + continue; + } if (resource_user->free_pool < 0 && -resource_user->free_pool <= resource_quota->free_pool) { int64_t amt = -resource_user->free_pool; @@ -308,6 +332,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, } if (resource_user->free_pool >= 0) { resource_user->allocating = false; + resource_user->outstanding_allocations = 0; GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); } else { @@ -488,6 +513,9 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, } static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { + if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + gpr_log(GPR_DEBUG, "RU shutdown %p", ru); + } grpc_resource_user *resource_user = (grpc_resource_user *)ru; GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); @@ -497,6 +525,9 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { resource_user->reclaimers[1] = NULL; rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); + if (resource_user->allocating) { + rq_step_sched(exec_ctx, resource_user->resource_quota); + } } static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { @@ -718,6 +749,7 @@ grpc_resource_user *grpc_resource_user_create( resource_user->reclaimers[1] = NULL; resource_user->new_reclaimers[0] = NULL; resource_user->new_reclaimers[1] = NULL; + resource_user->outstanding_allocations = 0; for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_user->links[i].next = resource_user->links[i].prev = NULL; } @@ -778,6 +810,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&resource_user->mu); ru_ref_by(resource_user, (gpr_atm)size); resource_user->free_pool -= (int64_t)size; + resource_user->outstanding_allocations += (int64_t)size; if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, @@ -792,6 +825,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); } } else { + resource_user->outstanding_allocations -= (int64_t)size; GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index 3afb525434..1d4249b7e2 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -158,4 +158,4 @@ grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 129bb54fc9..1fd552febb 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -85,4 +85,4 @@ int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h index f319e931b6..d6c538ec6f 100644 --- a/src/core/lib/iomgr/socket_utils.h +++ b/src/core/lib/iomgr/socket_utils.h @@ -32,4 +32,4 @@ const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */ diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 623b83f08b..73809b68d3 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -137,4 +137,4 @@ grpc_error *grpc_create_dualstack_socket_using_factory( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h index a00a7615a3..84fa071e89 100644 --- a/src/core/lib/iomgr/socket_windows.h +++ b/src/core/lib/iomgr/socket_windows.h @@ -115,4 +115,4 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 18cf6114f2..b2f365f2af 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -39,10 +39,10 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline); + grpc_millis deadline); #ifdef __cplusplus } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 7d9e9533fd..5611dd9062 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -48,7 +48,6 @@ extern grpc_tracer_flag grpc_tcp_trace; typedef struct { gpr_mu mu; grpc_fd *fd; - gpr_timespec deadline; grpc_timer alarm; grpc_closure on_alarm; int refs; @@ -244,7 +243,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -325,9 +324,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&ac->mu); GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); @@ -342,7 +339,7 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; } void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, @@ -350,7 +347,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h index 0b9775504c..8740511804 100644 --- a/src/core/lib/iomgr/tcp_client_posix.h +++ b/src/core/lib/iomgr/tcp_client_posix.h @@ -35,4 +35,4 @@ grpc_endpoint *grpc_tcp_client_create_from_fd( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_client_uv.cc b/src/core/lib/iomgr/tcp_client_uv.cc index 83835978f4..f3e9366299 100644 --- a/src/core/lib/iomgr/tcp_client_uv.cc +++ b/src/core/lib/iomgr/tcp_client_uv.cc @@ -119,7 +119,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *resolved_addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_uv_tcp_connect *connect; grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); (void)channel_args; @@ -158,9 +158,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, uv_tc_on_connect); GRPC_CLOSURE_INIT(&connect->on_alarm, uv_tc_on_alarm, connect, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &connect->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &connect->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &connect->alarm, deadline, &connect->on_alarm); } // overridden by api_fuzzer.c @@ -169,7 +167,7 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; } void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, @@ -177,7 +175,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc index 1154965c82..9adf7ee4e9 100644 --- a/src/core/lib/iomgr/tcp_client_windows.cc +++ b/src/core/lib/iomgr/tcp_client_windows.cc @@ -43,7 +43,6 @@ typedef struct { grpc_closure *on_done; gpr_mu mu; grpc_winsocket *socket; - gpr_timespec deadline; grpc_timer alarm; grpc_closure on_alarm; char *addr_name; @@ -126,7 +125,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { static void tcp_client_connect_impl( grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, - const grpc_resolved_address *addr, gpr_timespec deadline) { + const grpc_resolved_address *addr, grpc_millis deadline) { SOCKET sock = INVALID_SOCKET; BOOL success; int status; @@ -206,8 +205,7 @@ static void tcp_client_connect_impl( GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm, - gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect); return; @@ -233,7 +231,7 @@ void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; } void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, @@ -241,7 +239,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 4489896ffb..dbcc976ae9 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -135,13 +135,11 @@ static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN)); + grpc_millis deadline = grpc_exec_ctx_now(exec_ctx) + 13 * GPR_MS_PER_SEC; GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx); - GRPC_LOG_IF_ERROR("backup_poller:pollset_work", - grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, - now, deadline)); + GRPC_LOG_IF_ERROR( + "backup_poller:pollset_work", + grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, deadline)); gpr_mu_unlock(p->pollset_mu); /* last "uncovered" notification is the ref that keeps us polling, if we get * there try a cas to release it */ diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index dda78b2f8e..47e78fa67e 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -61,4 +61,4 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 3f190ac285..8f9ce3819e 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -106,4 +106,4 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 4bb0660f09..6746333960 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -125,4 +125,4 @@ bool grpc_tcp_server_have_ifaddrs(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h index ba7db8a0f7..3399535b42 100644 --- a/src/core/lib/iomgr/tcp_uv.h +++ b/src/core/lib/iomgr/tcp_uv.h @@ -50,4 +50,4 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ diff --git a/src/core/lib/iomgr/time_averaged_stats.h b/src/core/lib/iomgr/time_averaged_stats.h index e255b58fee..d38ed272b6 100644 --- a/src/core/lib/iomgr/time_averaged_stats.h +++ b/src/core/lib/iomgr/time_averaged_stats.h @@ -78,4 +78,4 @@ double grpc_time_averaged_stats_update_average(grpc_time_averaged_stats* stats); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H */ diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index 466600d582..419e834cf1 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -45,8 +45,7 @@ typedef struct grpc_timer grpc_timer; application callback is also responsible for maintaining information about when to free up any user-level state. */ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now); + grpc_millis deadline, grpc_closure *closure); /* Initialize *timer without setting it. This can later be passed through the regular init or cancel */ @@ -96,8 +95,8 @@ typedef enum { with high probability at least one thread in the system will see an update at any time slice. */ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next); -void grpc_timer_list_init(gpr_timespec now); + grpc_millis *next); +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); /* Consume a kick issued by grpc_kick_poller */ diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 971d80d8bc..b8e895de6f 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -220,9 +220,6 @@ struct shared_mutables { static struct shared_mutables g_shared_mutables; -static gpr_clock_type g_clock_type; -static gpr_timespec g_start_time; - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -235,52 +232,19 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm *next, grpc_error *error); -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - -static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = GPR_MS_PER_SEC * (double)ts.tv_sec + - (double)ts.tv_nsec / GPR_NS_PER_MS + - (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = - GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_timespec atm_to_timespec(gpr_atm x) { - return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); -} - static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } -void grpc_timer_list_init(gpr_timespec now) { +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { uint32_t i; g_shared_mutables.initialized = true; g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; gpr_mu_init(&g_shared_mutables.mu); - g_clock_type = now.clock_type; - g_start_time = now; - g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); grpc_register_tracer(&grpc_timer_trace); @@ -317,10 +281,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { g_shared_mutables.initialized = false; } -static double ts_to_dbl(gpr_timespec ts) { - return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; -} - /* returns true if the first element in the list */ static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; @@ -361,24 +321,20 @@ static void note_deadline_change(timer_shard *shard) { void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { int is_first_timer = 0; timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; - GPR_ASSERT(deadline.clock_type == g_clock_type); - GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline); + timer->deadline = deadline; #ifndef NDEBUG timer->hash_table_next = NULL; #endif if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR - "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", - timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec, - now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb); + gpr_log(GPR_DEBUG, + "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, + deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb); } if (!g_shared_mutables.initialized) { @@ -391,7 +347,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_mu_lock(&shard->mu); timer->pending = true; - if (gpr_time_cmp(deadline, now) <= 0) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline <= now) { timer->pending = false; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); gpr_mu_unlock(&shard->mu); @@ -400,11 +357,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } grpc_time_averaged_stats_add_sample(&shard->stats, - ts_to_dbl(gpr_time_sub(deadline, now))); + (double)(deadline - now) / 1000.0); ADD_TO_HASH_TABLE(timer); - if (deadline_atm < shard->queue_deadline_cap) { + if (deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; @@ -435,12 +392,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, shard->min_deadline); } - if (deadline_atm < shard->min_deadline) { + if (deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline_atm; + shard->min_deadline = deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && deadline_atm < old_min_deadline) { - gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline_atm); + if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline); grpc_kick_poller(); } } @@ -544,8 +501,9 @@ static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { } if (timer->deadline > now) return NULL; if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer, - now - timer->deadline); + gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler", + timer, now - timer->deadline, + timer->closure->scheduler->vtable->name); } timer->pending = false; grpc_timer_heap_pop(&shard->heap); @@ -567,6 +525,10 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, } *new_min_deadline = compute_min_deadline(shard); gpr_mu_unlock(&shard->mu); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR, + (int)(shard - g_shards), n); + } return n; } @@ -639,29 +601,27 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { // prelude - GPR_ASSERT(now.clock_type == g_clock_type); - gpr_atm now_atm = timespec_to_atm_round_down(now); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ - gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); - if (now_atm < min_timer) { + grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now < min_timer) { if (next != NULL) { - *next = - atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); + *next = GPR_MIN(*next, min_timer); } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, - "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, - now_atm, min_timer); + "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, + min_timer); } return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 + now != GRPC_MILLIS_INF_FUTURE ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); @@ -671,34 +631,24 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, timespec_to_atm_round_down(*next)); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } - gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR - "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, - now.tv_sec, now.tv_nsec, now_atm, next_str, - gpr_tls_get(&g_last_seen_min_timer), + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR + " next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + now, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); } // actual code - grpc_timer_check_result r; - gpr_atm next_atm; - if (next == NULL) { - r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); - } else { - next_atm = timespec_to_atm_round_down(*next); - r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); - *next = atm_to_timespec(next_atm); - } + grpc_timer_check_result r = + run_some_expired_timers(exec_ctx, now, next, shutdown_error); // tracing if (GRPC_TRACER_ON(grpc_timer_check_trace)) { char *next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, next_atm); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); diff --git a/src/core/lib/iomgr/timer_heap.h b/src/core/lib/iomgr/timer_heap.h index f15e8a3abb..228d038ab3 100644 --- a/src/core/lib/iomgr/timer_heap.h +++ b/src/core/lib/iomgr/timer_heap.h @@ -47,4 +47,4 @@ int grpc_timer_heap_is_empty(grpc_timer_heap *heap); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TIMER_HEAP_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TIMER_HEAP_H */ diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 9b54fab898..1248f82189 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -55,7 +55,7 @@ static bool g_kicked; static bool g_has_timed_waiter; // the deadline of the current timed waiter thread (only relevant if // g_has_timed_waiter is true) -static gpr_timespec g_timed_waiter_deadline; +static grpc_millis g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -99,9 +99,8 @@ static void start_timer_thread_and_unlock(void) { void grpc_timer_manager_tick() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_timer_check(&exec_ctx, now, &next); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_timer_check(&exec_ctx, &next); grpc_exec_ctx_finish(&exec_ctx); } @@ -124,6 +123,9 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { gpr_mu_unlock(&g_mu); } // without our lock, flush the exec_ctx + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "flush exec_ctx"); + } grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&g_mu); // garbage collect any threads hanging out that are dead @@ -136,8 +138,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { // wait until 'next' (or forever if there is already a timed waiter in the pool) // returns true if the thread should continue executing (false if it should // shutdown) -static bool wait_until(gpr_timespec next) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); +static bool wait_until(grpc_exec_ctx *exec_ctx, grpc_millis next) { gpr_mu_lock(&g_mu); // if we're not threaded anymore, leave if (!g_threaded) { @@ -171,30 +172,29 @@ static bool wait_until(gpr_timespec next) { unless their 'next' is earlier than the current timed-waiter's deadline (in which case the thread with earlier 'next' takes over as the new timed waiter) */ - if (gpr_time_cmp(next, inf_future) != 0) { - if (!g_has_timed_waiter || - (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + if (next != GRPC_MILLIS_INF_FUTURE) { + if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) { my_timed_waiter_generation = ++g_timed_waiter_generation; g_has_timed_waiter = true; g_timed_waiter_deadline = next; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = - gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx); + gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", + wait_time); } } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; } } if (GRPC_TRACER_ON(grpc_timer_check_trace) && - gpr_time_cmp(next, inf_future) == 0) { + next == GRPC_MILLIS_INF_FUTURE) { gpr_log(GPR_DEBUG, "sleep until kicked"); } - gpr_cv_wait(&g_cv_wait, &g_mu, next); + gpr_cv_wait(&g_cv_wait, &g_mu, + grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME)); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", @@ -206,7 +206,7 @@ static bool wait_until(gpr_timespec next) { // there's work to do after checking timers (code above) if (my_timed_waiter_generation == g_timed_waiter_generation) { g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; } } @@ -222,12 +222,11 @@ static bool wait_until(gpr_timespec next) { } static void timer_main_loop(grpc_exec_ctx *exec_ctx) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { - gpr_timespec next = inf_future; - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_exec_ctx_invalidate_now(exec_ctx); // check timer state, updates next to the next time to run a check - switch (grpc_timer_check(exec_ctx, now, &next)) { + switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: run_some_timers(exec_ctx); break; @@ -244,10 +243,10 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; /* fall through */ case GRPC_TIMERS_CHECKED_AND_EMPTY: - if (!wait_until(next)) { + if (!wait_until(exec_ctx, next)) { return; } break; @@ -303,7 +302,7 @@ void grpc_timer_manager_init(void) { g_completed_threads = NULL; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; start_threads(); } @@ -350,7 +349,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); diff --git a/src/core/lib/iomgr/timer_manager.h b/src/core/lib/iomgr/timer_manager.h index d8a59a9477..72960d6ffc 100644 --- a/src/core/lib/iomgr/timer_manager.h +++ b/src/core/lib/iomgr/timer_manager.h @@ -42,4 +42,4 @@ void grpc_timer_manager_tick(void); } #endif -#endif /* GRPC_CORE_LIB_IOMGR_TIMER_MANAGER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_TIMER_MANAGER_H */ diff --git a/src/core/lib/iomgr/timer_uv.cc b/src/core/lib/iomgr/timer_uv.cc index 53f79b545a..ccbbe357ae 100644 --- a/src/core/lib/iomgr/timer_uv.cc +++ b/src/core/lib/iomgr/timer_uv.cc @@ -55,19 +55,18 @@ void run_expired_timer(uv_timer_t *handle) { } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { uint64_t timeout; uv_timer_t *uv_timer; GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; - if (gpr_time_cmp(deadline, now) <= 0) { + if (deadline <= grpc_exec_ctx_now(exec_ctx)) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } timer->pending = 1; - timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); + timeout = (uint64_t)(deadline - grpc_exec_ctx_now(exec_ctx)); uv_timer = (uv_timer_t *)gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); uv_timer->data = timer; @@ -91,11 +90,11 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { return GRPC_TIMERS_NOT_CHECKED; } -void grpc_timer_list_init(gpr_timespec now) {} +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {} void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {} void grpc_timer_consume_kick(void) {} diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index bcd8572260..e887cb1bcf 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -81,4 +81,4 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, } #endif -#endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */ diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h index b96131ae1c..3e7f9c7d1e 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.h +++ b/src/core/lib/iomgr/unix_sockets_posix.h @@ -46,4 +46,4 @@ char *grpc_sockaddr_to_uri_unix_if_possible( } #endif -#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */ diff --git a/src/core/lib/json/json.h b/src/core/lib/json/json.h index 81b7e0c9da..c9fdec4ecb 100644 --- a/src/core/lib/json/json.h +++ b/src/core/lib/json/json.h @@ -78,4 +78,4 @@ void grpc_json_destroy(grpc_json* json); } #endif -#endif /* GRPC_CORE_LIB_JSON_JSON_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_JSON_JSON_H */ diff --git a/src/core/lib/json/json_reader.h b/src/core/lib/json/json_reader.h index ab2384f7a7..7f14a9a9c8 100644 --- a/src/core/lib/json/json_reader.h +++ b/src/core/lib/json/json_reader.h @@ -150,4 +150,4 @@ int grpc_json_reader_is_complete(grpc_json_reader *reader); } #endif -#endif /* GRPC_CORE_LIB_JSON_JSON_READER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_JSON_JSON_READER_H */ diff --git a/src/core/lib/json/json_writer.h b/src/core/lib/json/json_writer.h index 18bd2a80fe..132d1f24e8 100644 --- a/src/core/lib/json/json_writer.h +++ b/src/core/lib/json/json_writer.h @@ -87,4 +87,4 @@ void grpc_json_writer_value_string(grpc_json_writer *writer, } #endif -#endif /* GRPC_CORE_LIB_JSON_JSON_WRITER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_JSON_JSON_WRITER_H */ diff --git a/src/core/lib/security/credentials/fake/fake_credentials.h b/src/core/lib/security/credentials/fake/fake_credentials.h index 64f6f439f0..ed3f893c58 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.h +++ b/src/core/lib/security/credentials/fake/fake_credentials.h @@ -64,4 +64,4 @@ typedef struct { } #endif -#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_FAKE_FAKE_CREDENTIALS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_FAKE_FAKE_CREDENTIALS_H */ diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.cc b/src/core/lib/security/credentials/google_default/google_default_credentials.cc index 8fe5802d49..5b2ddceb4a 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.cc +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.cc @@ -97,7 +97,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { /* The http call is local. If it takes more than one sec, it is for sure not on compute engine. */ - gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); + grpc_millis max_detection_delay = GPR_MS_PER_SEC; grpc_pollset *pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(pollset, &g_polling_mu); @@ -116,7 +116,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { grpc_resource_quota_create("google_default_credentials"); grpc_httpcli_get( exec_ctx, &context, &detector.pollent, resource_quota, &request, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), + grpc_exec_ctx_now(exec_ctx) + max_detection_delay, GRPC_CLOSURE_CREATE(on_compute_engine_detection_http_response, &detector, grpc_schedule_on_exec_ctx), &detector.response); @@ -133,8 +133,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { "pollset_work", grpc_pollset_work(exec_ctx, grpc_polling_entity_pollset(&detector.pollent), - &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)))) { + &worker, GRPC_MILLIS_INF_FUTURE))) { detector.is_done = 1; detector.success = 0; } diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.h b/src/core/lib/security/credentials/jwt/jwt_credentials.h index c09485fd55..5cee6ed0da 100644 --- a/src/core/lib/security/credentials/jwt/jwt_credentials.h +++ b/src/core/lib/security/credentials/jwt/jwt_credentials.h @@ -53,4 +53,4 @@ grpc_service_account_jwt_access_credentials_create_from_auth_json_key( } #endif -#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_JWT_JWT_CREDENTIALS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_JWT_JWT_CREDENTIALS_H */ diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.cc b/src/core/lib/security/credentials/jwt/jwt_verifier.cc index aea16dee92..39e72c195b 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.cc +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.cc @@ -384,7 +384,7 @@ void verifier_cb_ctx_destroy(grpc_exec_ctx *exec_ctx, verifier_cb_ctx *ctx) { gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0, GPR_TIMESPAN}; /* Max delay defaults to one minute. */ -gpr_timespec grpc_jwt_verifier_max_delay = {60, 0, GPR_TIMESPAN}; +grpc_millis grpc_jwt_verifier_max_delay = 60 * GPR_MS_PER_SEC; typedef struct { char *email_domain; @@ -711,7 +711,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, resource_quota = grpc_resource_quota_create("jwt_verifier"); grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), + grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay, GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), &ctx->responses[HTTP_RESPONSE_KEYS]); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); @@ -838,10 +838,10 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ resource_quota = grpc_resource_quota_create("jwt_verifier"); - grpc_httpcli_get( - exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), - http_cb, &ctx->responses[rsp_idx]); + grpc_httpcli_get(exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, + resource_quota, &req, + grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay, + http_cb, &ctx->responses[rsp_idx]); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); gpr_free(req.host); gpr_free(req.http.path); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.h b/src/core/lib/security/credentials/jwt/jwt_verifier.h index 0603811627..998365e75c 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.h +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.h @@ -85,7 +85,7 @@ typedef struct { /* Globals to control the verifier. Not thread-safe. */ extern gpr_timespec grpc_jwt_verifier_clock_skew; -extern gpr_timespec grpc_jwt_verifier_max_delay; +extern grpc_millis grpc_jwt_verifier_max_delay; /* The verifier can be created with some custom mappings to help with key discovery in the case where the issuer is an email address. diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index 0a801bec82..f52a424e36 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -117,7 +117,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx, grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( grpc_exec_ctx *exec_ctx, const grpc_http_response *response, - grpc_mdelem *token_md, gpr_timespec *token_lifetime) { + grpc_mdelem *token_md, grpc_millis *token_lifetime) { char *null_terminated_body = NULL; char *new_access_token = NULL; grpc_credentials_status status = GRPC_CREDENTIALS_OK; @@ -183,9 +183,7 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response( } gpr_asprintf(&new_access_token, "%s %s", token_type->value, access_token->value); - token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10); - token_lifetime->tv_nsec = 0; - token_lifetime->clock_type = GPR_TIMESPAN; + *token_lifetime = strtol(expires_in->value, NULL, 10) * GPR_MS_PER_SEC; if (!GRPC_MDISNULL(*token_md)) GRPC_MDELEM_UNREF(exec_ctx, *token_md); *token_md = grpc_mdelem_from_slices( exec_ctx, @@ -214,7 +212,7 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx, grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)r->creds; grpc_mdelem access_token_md = GRPC_MDNULL; - gpr_timespec token_lifetime; + grpc_millis token_lifetime; grpc_credentials_status status = grpc_oauth2_token_fetcher_credentials_parse_server_response( exec_ctx, &r->response, &access_token_md, &token_lifetime); @@ -222,10 +220,9 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&c->mu); c->token_fetch_pending = false; c->access_token_md = GRPC_MDELEM_REF(access_token_md); - c->token_expiration = - status == GRPC_CREDENTIALS_OK - ? gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime) - : gpr_inf_past(GPR_CLOCK_REALTIME); + c->token_expiration = status == GRPC_CREDENTIALS_OK + ? grpc_exec_ctx_now(exec_ctx) + token_lifetime + : 0; grpc_oauth2_pending_get_request_metadata *pending_request = c->pending_requests; c->pending_requests = NULL; @@ -260,14 +257,12 @@ static bool oauth2_token_fetcher_get_request_metadata( grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)creds; // Check if we can use the cached token. - gpr_timespec refresh_threshold = gpr_time_from_seconds( - GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN); + grpc_millis refresh_threshold = + GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS * GPR_MS_PER_SEC; grpc_mdelem cached_access_token_md = GRPC_MDNULL; gpr_mu_lock(&c->mu); if (!GRPC_MDISNULL(c->access_token_md) && - (gpr_time_cmp( - gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)), - refresh_threshold) > 0)) { + (c->token_expiration + grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) { cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md); } if (!GRPC_MDISNULL(cached_access_token_md)) { @@ -296,10 +291,10 @@ static bool oauth2_token_fetcher_get_request_metadata( gpr_mu_unlock(&c->mu); if (start_fetch) { grpc_call_credentials_ref(creds); - c->fetch_func( - exec_ctx, grpc_credentials_metadata_request_create(creds), - &c->httpcli_context, &c->pollent, on_oauth2_token_fetcher_http_response, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), refresh_threshold)); + c->fetch_func(exec_ctx, grpc_credentials_metadata_request_create(creds), + &c->httpcli_context, &c->pollent, + on_oauth2_token_fetcher_http_response, + grpc_exec_ctx_now(exec_ctx) + refresh_threshold); } return false; } @@ -340,7 +335,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2; gpr_ref_init(&c->base.refcount, 1); gpr_mu_init(&c->mu); - c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); + c->token_expiration = 0; c->fetch_func = fetch_func; c->pollent = grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create()); @@ -358,7 +353,7 @@ static grpc_call_credentials_vtable compute_engine_vtable = { static void compute_engine_fetch_oauth2( grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent, - grpc_iomgr_cb_func response_cb, gpr_timespec deadline) { + grpc_iomgr_cb_func response_cb, grpc_millis deadline) { grpc_http_header header = {(char *)"Metadata-Flavor", (char *)"Google"}; grpc_httpcli_request request; memset(&request, 0, sizeof(grpc_httpcli_request)); @@ -410,7 +405,7 @@ static grpc_call_credentials_vtable refresh_token_vtable = { static void refresh_token_fetch_oauth2( grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent, - grpc_iomgr_cb_func response_cb, gpr_timespec deadline) { + grpc_iomgr_cb_func response_cb, grpc_millis deadline) { grpc_google_refresh_token_credentials *c = (grpc_google_refresh_token_credentials *)metadata_req->creds; grpc_http_header header = {(char *)"Content-Type", diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h index c8a9333417..c12db896f3 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h @@ -61,7 +61,7 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *http_context, grpc_polling_entity *pollent, grpc_iomgr_cb_func cb, - gpr_timespec deadline); + grpc_millis deadline); typedef struct grpc_oauth2_pending_get_request_metadata { grpc_credentials_mdelem_array *md_array; @@ -74,7 +74,7 @@ typedef struct { grpc_call_credentials base; gpr_mu mu; grpc_mdelem access_token_md; - gpr_timespec token_expiration; + grpc_millis token_expiration; bool token_fetch_pending; grpc_oauth2_pending_get_request_metadata *pending_requests; grpc_httpcli_context httpcli_context; @@ -104,10 +104,10 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token( grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( grpc_exec_ctx *exec_ctx, const struct grpc_http_response *response, - grpc_mdelem *token_md, gpr_timespec *token_lifetime); + grpc_mdelem *token_md, grpc_millis *token_lifetime); #ifdef __cplusplus } #endif -#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H */ diff --git a/src/core/lib/security/transport/lb_targets_info.h b/src/core/lib/security/transport/lb_targets_info.h index 705d33b0ab..43f0e64556 100644 --- a/src/core/lib/security/transport/lb_targets_info.h +++ b/src/core/lib/security/transport/lb_targets_info.h @@ -37,4 +37,4 @@ grpc_slice_hash_table *grpc_lb_targets_info_find_in_args( } #endif -#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_LB_TARGETS_INFO_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_LB_TARGETS_INFO_H */ diff --git a/src/core/lib/security/transport/secure_endpoint.h b/src/core/lib/security/transport/secure_endpoint.h index 832cc1c0ce..980449c03e 100644 --- a/src/core/lib/security/transport/secure_endpoint.h +++ b/src/core/lib/security/transport/secure_endpoint.h @@ -44,4 +44,4 @@ grpc_endpoint *grpc_secure_endpoint_create( } #endif -#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURE_ENDPOINT_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURE_ENDPOINT_H */ diff --git a/src/core/lib/security/transport/security_handshaker.h b/src/core/lib/security/transport/security_handshaker.h index 345065f26c..178099bb94 100644 --- a/src/core/lib/security/transport/security_handshaker.h +++ b/src/core/lib/security/transport/security_handshaker.h @@ -39,4 +39,4 @@ void grpc_security_register_handshaker_factories(); } #endif -#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */ diff --git a/src/core/lib/security/transport/tsi_error.h b/src/core/lib/security/transport/tsi_error.h index 4c78b06603..4e19daf796 100644 --- a/src/core/lib/security/transport/tsi_error.h +++ b/src/core/lib/security/transport/tsi_error.h @@ -32,4 +32,4 @@ grpc_error *grpc_set_tsi_error_result(grpc_error *error, tsi_result result); } #endif -#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_TSI_ERROR_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_TSI_ERROR_H */ diff --git a/src/core/lib/security/util/json_util.h b/src/core/lib/security/util/json_util.h index 43a2f6b9d1..cdd8a7198a 100644 --- a/src/core/lib/security/util/json_util.h +++ b/src/core/lib/security/util/json_util.h @@ -45,4 +45,4 @@ bool grpc_copy_json_string_property(const grpc_json *json, } #endif -#endif /* GRPC_CORE_LIB_SECURITY_UTIL_JSON_UTIL_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SECURITY_UTIL_JSON_UTIL_H */ diff --git a/src/core/lib/slice/b64.h b/src/core/lib/slice/b64.h index c01da56575..9b4dc65dbb 100644 --- a/src/core/lib/slice/b64.h +++ b/src/core/lib/slice/b64.h @@ -55,4 +55,4 @@ grpc_slice grpc_base64_decode_with_len(grpc_exec_ctx *exec_ctx, const char *b64, } #endif -#endif /* GRPC_CORE_LIB_SLICE_B64_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SLICE_B64_H */ diff --git a/src/core/lib/slice/percent_encoding.h b/src/core/lib/slice/percent_encoding.h index e6f85120c3..14a4deb44b 100644 --- a/src/core/lib/slice/percent_encoding.h +++ b/src/core/lib/slice/percent_encoding.h @@ -68,4 +68,4 @@ grpc_slice grpc_permissive_percent_decode_slice(grpc_slice slice_in); } #endif -#endif /* GRPC_CORE_LIB_SLICE_PERCENT_ENCODING_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SLICE_PERCENT_ENCODING_H */ diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h index 3c3f0e61f3..41250df738 100644 --- a/src/core/lib/slice/slice_hash_table.h +++ b/src/core/lib/slice/slice_hash_table.h @@ -75,4 +75,4 @@ int grpc_slice_hash_table_cmp(const grpc_slice_hash_table *a, } #endif -#endif /* GRPC_CORE_LIB_SLICE_SLICE_HASH_TABLE_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SLICE_SLICE_HASH_TABLE_H */ diff --git a/src/core/lib/slice/slice_internal.h b/src/core/lib/slice/slice_internal.h index 8591185c53..fcf70a0e55 100644 --- a/src/core/lib/slice/slice_internal.h +++ b/src/core/lib/slice/slice_internal.h @@ -54,4 +54,4 @@ int grpc_static_slice_eq(grpc_slice a, grpc_slice b); } #endif -#endif /* GRPC_CORE_LIB_SLICE_SLICE_INTERNAL_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SLICE_SLICE_INTERNAL_H */ diff --git a/src/core/lib/slice/slice_traits.h b/src/core/lib/slice/slice_traits.h index 1eda17cf00..7fdb6752cb 100644 --- a/src/core/lib/slice/slice_traits.h +++ b/src/core/lib/slice/slice_traits.h @@ -34,4 +34,4 @@ bool grpc_slice_is_bin_suffixed(grpc_slice s); } #endif -#endif /* GRPC_CORE_LIB_SLICE_SLICE_TRAITS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SLICE_SLICE_TRAITS_H */ diff --git a/src/core/lib/support/env_windows.cc b/src/core/lib/support/env_windows.cc index 73c643c560..c5a25dc201 100644 --- a/src/core/lib/support/env_windows.cc +++ b/src/core/lib/support/env_windows.cc @@ -43,7 +43,10 @@ char *gpr_getenv(const char *name) { DWORD ret; ret = GetEnvironmentVariable(tname, NULL, 0); - if (ret == 0) return NULL; + if (ret == 0) { + gpr_free(tname); + return NULL; + } size = ret * (DWORD)sizeof(TCHAR); tresult = (LPTSTR)gpr_malloc(size); ret = GetEnvironmentVariable(tname, tresult, size); diff --git a/src/core/lib/support/log_android.cc b/src/core/lib/support/log_android.cc index 6f1cec51f1..9e8529cbac 100644 --- a/src/core/lib/support/log_android.cc +++ b/src/core/lib/support/log_android.cc @@ -39,8 +39,8 @@ static android_LogPriority severity_to_log_priority(gpr_log_severity severity) { return ANDROID_LOG_DEFAULT; } -void gpr_log(const char *file, int line, gpr_log_severity severity, - const char *format, ...) { +extern "C" void gpr_log(const char *file, int line, gpr_log_severity severity, + const char *format, ...) { char *message = NULL; va_list args; va_start(args, format); @@ -50,8 +50,8 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, free(message); } -void gpr_default_log(gpr_log_func_args *args) { - char *final_slash; +extern "C" void gpr_default_log(gpr_log_func_args *args) { + const char *final_slash; const char *display_file; char *output = NULL; diff --git a/src/core/lib/support/time_posix.cc b/src/core/lib/support/time_posix.cc index deccb50975..3267ea6b54 100644 --- a/src/core/lib/support/time_posix.cc +++ b/src/core/lib/support/time_posix.cc @@ -30,7 +30,6 @@ #include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "src/core/lib/support/block_annotate.h" static struct timespec timespec_from_gpr(gpr_timespec gts) { struct timespec rv; @@ -159,9 +158,7 @@ void gpr_sleep_until(gpr_timespec until) { delta = gpr_time_sub(until, now); delta_ts = timespec_from_gpr(delta); - GRPC_SCHEDULING_START_BLOCKING_REGION; ns_result = nanosleep(&delta_ts, NULL); - GRPC_SCHEDULING_END_BLOCKING_REGION; if (ns_result == 0) { break; } diff --git a/src/core/lib/support/time_windows.cc b/src/core/lib/support/time_windows.cc index dda7566cd8..08c1b22964 100644 --- a/src/core/lib/support/time_windows.cc +++ b/src/core/lib/support/time_windows.cc @@ -28,7 +28,6 @@ #include <process.h> #include <sys/timeb.h> -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/time_precise.h" static LARGE_INTEGER g_start_time; @@ -94,9 +93,7 @@ void gpr_sleep_until(gpr_timespec until) { sleep_millis = delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; GPR_ASSERT((sleep_millis >= 0) && (sleep_millis <= INT_MAX)); - GRPC_SCHEDULING_START_BLOCKING_REGION; Sleep((DWORD)sleep_millis); - GRPC_SCHEDULING_END_BLOCKING_REGION; } } diff --git a/src/core/lib/surface/alarm.cc b/src/core/lib/surface/alarm.cc index 4e67543191..16a16bfd93 100644 --- a/src/core/lib/surface/alarm.cc +++ b/src/core/lib/surface/alarm.cc @@ -126,8 +126,7 @@ void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq, GPR_ASSERT(grpc_cq_begin_op(cq, tag)); grpc_timer_init(&exec_ctx, &alarm->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timespec_to_millis_round_up(deadline), &alarm->on_alarm); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 6c97f5cc01..8216aa0ec8 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -216,7 +216,7 @@ struct grpc_call { server, it's trailing metadata */ grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; int send_extra_metadata_count; - gpr_timespec send_deadline; + grpc_millis send_deadline; grpc_slice_buffer_stream sending_stream; @@ -283,7 +283,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error); -static void get_final_status(grpc_call *call, +static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details); @@ -372,11 +372,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } for (i = 0; i < 2; i++) { for (j = 0; j < 2; j++) { - call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE; } } - gpr_timespec send_deadline = - gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); + grpc_millis send_deadline = args->send_deadline; bool immediately_cancel = false; @@ -394,10 +393,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { - send_deadline = gpr_time_min( - gpr_convert_clock_type(send_deadline, - args->parent->send_deadline.clock_type), - args->parent->send_deadline); + send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ @@ -551,8 +547,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind"); } - get_final_status(c, set_status_value_directly, &c->final_info.final_status, - NULL); + get_final_status(exec_ctx, c, set_status_value_directly, + &c->final_info.final_status, NULL); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -738,13 +734,16 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, * FINAL STATUS CODE MANIPULATION */ -static bool get_final_status_from( - grpc_call *call, grpc_error *error, bool allow_ok_status, - void (*set_value)(grpc_status_code code, void *user_data), - void *set_value_user_data, grpc_slice *details) { +static bool get_final_status_from(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_error *error, bool allow_ok_status, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data, + grpc_slice *details) { grpc_status_code code; grpc_slice slice = grpc_empty_slice(); - grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL); + grpc_error_get_status(exec_ctx, error, call->send_deadline, &code, &slice, + NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } @@ -756,7 +755,7 @@ static bool get_final_status_from( return true; } -static void get_final_status(grpc_call *call, +static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { @@ -781,8 +780,9 @@ static void get_final_status(grpc_call *call, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set && grpc_error_has_clear_grpc_status(status[i].error)) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details)) { + if (get_final_status_from(exec_ctx, call, status[i].error, + allow_ok_status != 0, set_value, + set_value_user_data, details)) { return; } } @@ -790,8 +790,9 @@ static void get_final_status(grpc_call *call, /* If no clearly defined status exists, search for 'anything' */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details)) { + if (get_final_status_from(exec_ctx, call, status[i].error, + allow_ok_status != 0, set_value, + set_value_user_data, details)) { return; } } @@ -1330,17 +1331,22 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, } if (call->is_client) { - get_final_status(call, set_status_value_directly, + get_final_status(exec_ctx, call, set_status_value_directly, call->final_op.client.status, call->final_op.client.status_details); } else { - get_final_status(call, set_cancelled_value, + get_final_status(exec_ctx, call, set_cancelled_value, call->final_op.server.cancelled, NULL); } GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } + if (error != GRPC_ERROR_NONE && bctl->op.recv_message && + *call->receiving_buffer != NULL) { + grpc_byte_buffer_destroy(*call->receiving_buffer); + *call->receiving_buffer = NULL; + } if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ @@ -1611,11 +1617,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, validate_filtered_metadata(exec_ctx, bctl); GPR_TIMER_END("validate_filtered_metadata", 0); - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - call->send_deadline = - gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC); + if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) { + call->send_deadline = md->deadline; } } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index c680139cf6..27c2f5243c 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -49,7 +49,7 @@ typedef struct grpc_call_create_args { grpc_mdelem *add_initial_metadata; size_t add_initial_metadata_count; - gpr_timespec send_deadline; + grpc_millis send_deadline; } grpc_call_create_args; /* Create a new call based on \a args. diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 59fced7bc4..860dcc82db 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -262,7 +262,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative, grpc_mdelem path_mdelem, - grpc_mdelem authority_mdelem, gpr_timespec deadline) { + grpc_mdelem authority_mdelem, grpc_millis deadline) { grpc_mdelem send_metadata[2]; size_t num_metadata = 0; @@ -308,7 +308,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, host != NULL ? grpc_mdelem_from_slices(&exec_ctx, GRPC_MDSTR_AUTHORITY, grpc_slice_ref_internal(*host)) : GRPC_MDNULL, - deadline); + grpc_timespec_to_millis_round_up(deadline)); grpc_exec_ctx_finish(&exec_ctx); return call; } @@ -316,7 +316,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_call *grpc_channel_create_pollset_set_call( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method, - const grpc_slice *host, gpr_timespec deadline, void *reserved) { + const grpc_slice *host, grpc_millis deadline, void *reserved) { GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( exec_ctx, channel, parent_call, propagation_mask, NULL, pollset_set, @@ -372,7 +372,8 @@ grpc_call *grpc_channel_create_registered_call( grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call *call = grpc_channel_create_call_internal( &exec_ctx, channel, parent_call, propagation_mask, completion_queue, NULL, - GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), deadline); + GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), + grpc_timespec_to_millis_round_up(deadline)); grpc_exec_ctx_finish(&exec_ctx); return call; } diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 427422b565..4d1c7e369f 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -47,7 +47,7 @@ grpc_channel *grpc_channel_create_with_builder( grpc_call *grpc_channel_create_pollset_set_call( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, grpc_slice method, - const grpc_slice *host, gpr_timespec deadline, void *reserved); + const grpc_slice *host, grpc_millis deadline, void *reserved); /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); diff --git a/src/core/lib/surface/channel_stack_type.h b/src/core/lib/surface/channel_stack_type.h index 903b90a071..c77848794c 100644 --- a/src/core/lib/surface/channel_stack_type.h +++ b/src/core/lib/surface/channel_stack_type.h @@ -50,4 +50,4 @@ const char *grpc_channel_stack_type_string(grpc_channel_stack_type type); } #endif -#endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_STACK_TYPE_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_STACK_TYPE_H */ diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 453646bd49..36b4b835f8 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -61,8 +61,7 @@ typedef struct { grpc_error *(*kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker); grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, grpc_millis deadline); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); @@ -100,8 +99,7 @@ static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx, static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker, - gpr_timespec now, - gpr_timespec deadline) { + grpc_millis deadline) { non_polling_poller *npp = (non_polling_poller *)pollset; if (npp->shutdown) return GRPC_ERROR_NONE; non_polling_worker w; @@ -115,7 +113,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, w.next->prev = w.prev->next = &w; } w.kicked = false; - while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline)) + gpr_timespec deadline_ts = + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME); + while (!npp->shutdown && !w.kicked && + !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; if (&w == npp->root) { npp->root = w.next; @@ -743,7 +744,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, typedef struct { gpr_atm last_seen_things_queued_ever; grpc_completion_queue *cq; - gpr_timespec deadline; + grpc_millis deadline; grpc_cq_completion *stolen_completion; void *tag; /* for pluck */ bool first_loop; @@ -772,8 +773,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { return true; } } - return !a->first_loop && - gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); } #ifndef NDEBUG @@ -802,7 +802,6 @@ static void dump_pending_tags(grpc_completion_queue *cq) {} static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; - gpr_timespec now; cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -819,23 +818,20 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, dump_pending_tags(cq); - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cq, "next"); + grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { - gpr_atm_no_barrier_load(&cqd->things_queued_ever), cq, - deadline, + deadline_millis, NULL, NULL, true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); - for (;;) { - gpr_timespec iteration_deadline = deadline; + grpc_millis iteration_deadline = deadline_millis; if (is_finished_arg.stolen_completion != NULL) { grpc_cq_completion *c = is_finished_arg.stolen_completion; @@ -862,7 +858,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, attempt at popping. Not doing this can potentially deadlock this thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { - iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + iteration_deadline = 0; } } @@ -883,8 +879,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, break; } - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -895,7 +891,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, gpr_mu_lock(cq->mu); cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - NULL, now, iteration_deadline); + NULL, iteration_deadline); gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { @@ -1032,8 +1028,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { } gpr_mu_unlock(cq->mu); } - return !a->first_loop && - gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); } static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, @@ -1042,7 +1037,6 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; - gpr_timespec now; cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -1061,14 +1055,13 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cq, "pluck"); gpr_mu_lock(cq->mu); + grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { gpr_atm_no_barrier_load(&cqd->things_queued_ever), cq, - deadline, + deadline_millis, NULL, tag, true}; @@ -1120,8 +1113,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1129,10 +1122,9 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - &worker, now, deadline); + &worker, deadline_millis); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); diff --git a/src/core/lib/surface/completion_queue_factory.h b/src/core/lib/surface/completion_queue_factory.h index cb0af6f0fb..af8f3d60c3 100644 --- a/src/core/lib/surface/completion_queue_factory.h +++ b/src/core/lib/surface/completion_queue_factory.h @@ -41,4 +41,4 @@ struct grpc_completion_queue_factory { } #endif -#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H */ diff --git a/src/core/lib/surface/event_string.h b/src/core/lib/surface/event_string.h index 127609c404..2d53cf0fac 100644 --- a/src/core/lib/surface/event_string.h +++ b/src/core/lib/surface/event_string.h @@ -32,4 +32,4 @@ char *grpc_event_string(grpc_event *ev); } #endif -#endif /* GRPC_CORE_LIB_SURFACE_EVENT_STRING_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SURFACE_EVENT_STRING_H */ diff --git a/src/core/lib/surface/init.h b/src/core/lib/surface/init.h index b2f48576e5..d429026327 100644 --- a/src/core/lib/surface/init.h +++ b/src/core/lib/surface/init.h @@ -32,4 +32,4 @@ int grpc_is_initialized(void); } #endif -#endif /* GRPC_CORE_LIB_SURFACE_INIT_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SURFACE_INIT_H */ diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 6286f9159d..88e26cbeb7 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -74,7 +74,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, mdb->list.head = &calld->status; mdb->list.tail = &calld->details; mdb->list.count = 2; - mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + mdb->deadline = GRPC_MILLIS_INF_FUTURE; } static void lame_start_transport_stream_op_batch( diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 1d0fd472d0..dd09cb91de 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -137,7 +137,7 @@ struct call_data { bool host_set; grpc_slice path; grpc_slice host; - gpr_timespec deadline; + grpc_millis deadline; grpc_completion_queue *cq_new; @@ -492,11 +492,13 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_ASSERT(calld->path_set); rc->data.batch.details->host = grpc_slice_ref_internal(calld->host); rc->data.batch.details->method = grpc_slice_ref_internal(calld->path); - rc->data.batch.details->deadline = calld->deadline; + rc->data.batch.details->deadline = + grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC); rc->data.batch.details->flags = calld->recv_initial_metadata_flags; break; case REGISTERED_CALL: - *rc->data.registered.deadline = calld->deadline; + *rc->data.registered.deadline = + grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC); if (rc->data.registered.optional_payload) { *rc->data.registered.optional_payload = calld->payload; calld->payload = NULL; @@ -739,7 +741,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)ptr; call_data *calld = (call_data *)elem->call_data; - gpr_timespec op_deadline; + grpc_millis op_deadline; if (error == GRPC_ERROR_NONE) { GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != NULL); @@ -759,7 +761,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_REF(error); } op_deadline = calld->recv_initial_metadata->deadline; - if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { + if (op_deadline != GRPC_MILLIS_INF_FUTURE) { calld->deadline = op_deadline; } if (calld->host_set && calld->path_set) { @@ -833,7 +835,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, memset(&args, 0, sizeof(args)); args.channel = chand->channel; args.server_transport_data = transport_server_data; - args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + args.send_deadline = GRPC_MILLIS_INF_FUTURE; grpc_call *call; grpc_error *error = grpc_call_create(exec_ctx, &args, &call); grpc_call_element *elem = @@ -881,7 +883,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; memset(calld, 0, sizeof(call_data)); - calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + calld->deadline = GRPC_MILLIS_INF_FUTURE; calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 1114715833..375eab4a04 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -62,4 +62,4 @@ void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets, } #endif -#endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */ diff --git a/src/core/lib/surface/validate_metadata.h b/src/core/lib/surface/validate_metadata.h index aa02419d9f..afc8be6dfd 100644 --- a/src/core/lib/surface/validate_metadata.h +++ b/src/core/lib/surface/validate_metadata.h @@ -33,4 +33,4 @@ grpc_error *grpc_validate_header_nonbin_value_is_legal(grpc_slice slice); } #endif -#endif /* GRPC_CORE_LIB_SURFACE_VALIDATE_METADATA_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_SURFACE_VALIDATE_METADATA_H */ diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index e7fa0eefe8..6ed427ce5c 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -30,8 +30,12 @@ grpc_tracer_flag grpc_bdp_estimator_trace = void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) { estimator->estimate = 65536; estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; + estimator->ping_start_time = gpr_time_0(GPR_CLOCK_MONOTONIC); + estimator->next_ping_scheduled = 0; estimator->name = name; estimator->bw_est = 0; + estimator->inter_ping_delay = 100.0; // start at 100ms + estimator->stable_estimate_count = 0; } bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator, @@ -51,10 +55,11 @@ void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, estimator->accumulator += num_bytes; } -bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator) { +bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx, + const grpc_bdp_estimator *estimator) { switch (estimator->ping_state) { case GRPC_BDP_PING_UNSCHEDULED: - return true; + return grpc_exec_ctx_now(exec_ctx) >= estimator->next_ping_scheduled; case GRPC_BDP_PING_SCHEDULED: return false; case GRPC_BDP_PING_STARTED: @@ -84,11 +89,13 @@ void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) { estimator->ping_start_time = gpr_now(GPR_CLOCK_MONOTONIC); } -void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) { - gpr_timespec dt_ts = - gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), estimator->ping_start_time); +void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx, + grpc_bdp_estimator *estimator) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec dt_ts = gpr_time_sub(now, estimator->ping_start_time); double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; double bw = dt > 0 ? ((double)estimator->accumulator / dt) : 0; + int start_inter_ping_delay = estimator->inter_ping_delay; if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64 " dt=%lf bw=%lfMbs bw_est=%lfMbs", @@ -105,7 +112,26 @@ void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) { gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, estimator->name, estimator->estimate); } + estimator->inter_ping_delay /= 2; // if the ping estimate changes, + // exponentially get faster at probing + } else if (estimator->inter_ping_delay < 10000) { + estimator->stable_estimate_count++; + if (estimator->stable_estimate_count >= 2) { + estimator->inter_ping_delay += + 100 + + (int)(rand() * 100.0 / RAND_MAX); // if the ping estimate is steady, + // slowly ramp down the probe time + } + } + if (start_inter_ping_delay != estimator->inter_ping_delay) { + estimator->stable_estimate_count = 0; + if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { + gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", estimator->name, + estimator->inter_ping_delay); + } } estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; estimator->accumulator = 0; + estimator->next_ping_scheduled = + grpc_exec_ctx_now(exec_ctx) + estimator->inter_ping_delay; } diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index 21c27ec6af..a9d986782c 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -23,6 +23,7 @@ #include <stdbool.h> #include <stdint.h> #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/exec_ctx.h" #define GRPC_BDP_SAMPLES 16 #define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3 @@ -43,7 +44,12 @@ typedef struct grpc_bdp_estimator { grpc_bdp_estimator_ping_state ping_state; int64_t accumulator; int64_t estimate; + // when was the current ping started? gpr_timespec ping_start_time; + // when should the next ping start? + grpc_millis next_ping_scheduled; + int inter_ping_delay; + int stable_estimate_count; double bw_est; const char *name; } grpc_bdp_estimator; @@ -59,7 +65,8 @@ bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw); void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, int64_t num_bytes); // Returns true if the user should schedule a ping -bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator); +bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx, + const grpc_bdp_estimator *estimator); // Schedule a ping: call in response to receiving a true from // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a // transport (but not necessarily started) @@ -68,10 +75,11 @@ void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator); // the ping is on the wire void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator); // Completes a previously started ping -void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator); +void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx, + grpc_bdp_estimator *estimator); #ifdef __cplusplus } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H */ diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index d3e04df5c0..c1d8ee543f 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -143,4 +143,4 @@ void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream); } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 1796a540a7..c0ba188148 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -92,4 +92,4 @@ bool grpc_connectivity_state_notify_on_state_change( } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_CONNECTIVITY_STATE_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_CONNECTIVITY_STATE_H */ diff --git a/src/core/lib/transport/error_utils.cc b/src/core/lib/transport/error_utils.cc index 5e3920b627..2e3b61b7ab 100644 --- a/src/core/lib/transport/error_utils.cc +++ b/src/core/lib/transport/error_utils.cc @@ -39,8 +39,9 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error, return NULL; } -void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, - grpc_status_code *code, grpc_slice *slice, +void grpc_error_get_status(grpc_exec_ctx *exec_ctx, grpc_error *error, + grpc_millis deadline, grpc_status_code *code, + grpc_slice *slice, grpc_http2_error_code *http_error) { // Start with the parent error and recurse through the tree of children // until we find the first one that has a status code. @@ -63,8 +64,8 @@ void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, status = (grpc_status_code)integer; } else if (grpc_error_get_int(found_error, GRPC_ERROR_INT_HTTP2_ERROR, &integer)) { - status = grpc_http2_error_to_grpc_status((grpc_http2_error_code)integer, - deadline); + status = grpc_http2_error_to_grpc_status( + exec_ctx, (grpc_http2_error_code)integer, deadline); } if (code != NULL) *code = status; diff --git a/src/core/lib/transport/error_utils.h b/src/core/lib/transport/error_utils.h index 18ff54839c..b4f9df4bf1 100644 --- a/src/core/lib/transport/error_utils.h +++ b/src/core/lib/transport/error_utils.h @@ -20,6 +20,7 @@ #define GRPC_CORE_LIB_TRANSPORT_ERROR_UTILS_H #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/http2_errors.h" #ifdef __cplusplus @@ -32,8 +33,9 @@ extern "C" { /// All attributes are pulled from the same child error. If any of the /// attributes (code, msg, http_status) are unneeded, they can be passed as /// NULL. -void grpc_error_get_status(grpc_error *error, gpr_timespec deadline, - grpc_status_code *code, grpc_slice *slice, +void grpc_error_get_status(grpc_exec_ctx *exec_ctx, grpc_error *error, + grpc_millis deadline, grpc_status_code *code, + grpc_slice *slice, grpc_http2_error_code *http_status); /// A utility function to check whether there is a clear status code that @@ -46,4 +48,4 @@ bool grpc_error_has_clear_grpc_status(grpc_error *error); } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_ERROR_UTILS_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_ERROR_UTILS_H */ diff --git a/src/core/lib/transport/metadata_batch.cc b/src/core/lib/transport/metadata_batch.cc index 54388bdcda..2df9c9189c 100644 --- a/src/core/lib/transport/metadata_batch.cc +++ b/src/core/lib/transport/metadata_batch.cc @@ -74,7 +74,7 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) { void grpc_metadata_batch_init(grpc_metadata_batch *batch) { memset(batch, 0, sizeof(*batch)); - batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + batch->deadline = GRPC_MILLIS_INF_FUTURE; } void grpc_metadata_batch_destroy(grpc_exec_ctx *exec_ctx, @@ -270,9 +270,7 @@ void grpc_metadata_batch_clear(grpc_exec_ctx *exec_ctx, } bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) { - return batch->list.head == NULL && - gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type), - batch->deadline) == 0; + return batch->list.head == NULL && batch->deadline == GRPC_MILLIS_INF_FUTURE; } size_t grpc_metadata_batch_size(grpc_metadata_batch *batch) { diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 63f30a78d1..a2b4b92385 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -51,9 +51,9 @@ typedef struct grpc_metadata_batch { grpc_mdelem_list list; grpc_metadata_batch_callouts idx; /** Used to calculate grpc-timeout at the point of sending, - or gpr_inf_future if this batch does not need to send a + or GRPC_MILLIS_INF_FUTURE if this batch does not need to send a grpc-timeout */ - gpr_timespec deadline; + grpc_millis deadline; } grpc_metadata_batch; void grpc_metadata_batch_init(grpc_metadata_batch *batch); diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h index 17feabfd39..80899e9a20 100644 --- a/src/core/lib/transport/pid_controller.h +++ b/src/core/lib/transport/pid_controller.h @@ -67,4 +67,4 @@ double grpc_pid_controller_last(grpc_pid_controller *pid_controller); } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */ diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index c485f52472..9c43093627 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -67,4 +67,4 @@ void* grpc_method_config_table_get(grpc_exec_ctx* exec_ctx, } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H */ diff --git a/src/core/lib/transport/status_conversion.cc b/src/core/lib/transport/status_conversion.cc index a40d333284..891c4427d7 100644 --- a/src/core/lib/transport/status_conversion.cc +++ b/src/core/lib/transport/status_conversion.cc @@ -37,8 +37,9 @@ grpc_http2_error_code grpc_status_to_http2_error(grpc_status_code status) { } } -grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error, - gpr_timespec deadline) { +grpc_status_code grpc_http2_error_to_grpc_status(grpc_exec_ctx *exec_ctx, + grpc_http2_error_code error, + grpc_millis deadline) { switch (error) { case GRPC_HTTP2_NO_ERROR: /* should never be received */ @@ -46,7 +47,7 @@ grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error, case GRPC_HTTP2_CANCEL: /* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been * exceeded */ - return gpr_time_cmp(gpr_now(deadline.clock_type), deadline) >= 0 + return grpc_exec_ctx_now(exec_ctx) > deadline ? GRPC_STATUS_DEADLINE_EXCEEDED : GRPC_STATUS_CANCELLED; case GRPC_HTTP2_ENHANCE_YOUR_CALM: diff --git a/src/core/lib/transport/status_conversion.h b/src/core/lib/transport/status_conversion.h index b257998e4d..8ef91aecfe 100644 --- a/src/core/lib/transport/status_conversion.h +++ b/src/core/lib/transport/status_conversion.h @@ -20,6 +20,7 @@ #define GRPC_CORE_LIB_TRANSPORT_STATUS_CONVERSION_H #include <grpc/grpc.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/http2_errors.h" #ifdef __cplusplus @@ -28,8 +29,9 @@ extern "C" { /* Conversion of grpc status codes to http2 error codes (for RST_STREAM) */ grpc_http2_error_code grpc_status_to_http2_error(grpc_status_code status); -grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error, - gpr_timespec deadline); +grpc_status_code grpc_http2_error_to_grpc_status(grpc_exec_ctx *exec_ctx, + grpc_http2_error_code error, + grpc_millis deadline); /* Conversion of HTTP status codes (:status) to grpc status codes */ grpc_status_code grpc_http2_status_to_grpc_status(int status); @@ -39,4 +41,4 @@ int grpc_status_to_http2_status(grpc_status_code status); } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_STATUS_CONVERSION_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_STATUS_CONVERSION_H */ diff --git a/src/core/lib/transport/timeout_encoding.cc b/src/core/lib/transport/timeout_encoding.cc index 02f179d6a3..23a9ef308f 100644 --- a/src/core/lib/transport/timeout_encoding.cc +++ b/src/core/lib/transport/timeout_encoding.cc @@ -59,60 +59,27 @@ static void enc_seconds(char *buffer, int64_t sec) { } } -static void enc_nanos(char *buffer, int64_t x) { +static void enc_millis(char *buffer, int64_t x) { x = round_up_to_three_sig_figs(x); - if (x < 100000) { - if (x % 1000 == 0) { - enc_ext(buffer, x / 1000, 'u'); - } else { - enc_ext(buffer, x, 'n'); - } - } else if (x < 100000000) { - if (x % 1000000 == 0) { - enc_ext(buffer, x / 1000000, 'm'); - } else { - enc_ext(buffer, x / 1000, 'u'); - } - } else if (x < 1000000000) { - enc_ext(buffer, x / 1000000, 'm'); + if (x < GPR_MS_PER_SEC) { + enc_ext(buffer, x, 'm'); } else { - /* note that this is only ever called with times of less than one second, - so if we reach here the time must have been rounded up to a whole second - (and no more) */ - memcpy(buffer, "1S", 3); - } -} - -static void enc_micros(char *buffer, int64_t x) { - x = round_up_to_three_sig_figs(x); - if (x < 100000) { - if (x % 1000 == 0) { - enc_ext(buffer, x / 1000, 'm'); + if (x % GPR_MS_PER_SEC == 0) { + enc_seconds(buffer, x / GPR_MS_PER_SEC); } else { - enc_ext(buffer, x, 'u'); + enc_ext(buffer, x, 'm'); } - } else if (x < 100000000) { - if (x % 1000000 == 0) { - enc_ext(buffer, x / 1000000, 'S'); - } else { - enc_ext(buffer, x / 1000, 'm'); - } - } else { - enc_ext(buffer, x / 1000000, 'S'); } } -void grpc_http2_encode_timeout(gpr_timespec timeout, char *buffer) { - if (timeout.tv_sec < 0) { +void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer) { + if (timeout <= 0) { enc_tiny(buffer); - } else if (timeout.tv_sec == 0) { - enc_nanos(buffer, timeout.tv_nsec); - } else if (timeout.tv_sec < 1000 && timeout.tv_nsec != 0) { - enc_micros(buffer, - (int64_t)(timeout.tv_sec * 1000000) + - (timeout.tv_nsec / 1000 + (timeout.tv_nsec % 1000 != 0))); + } else if (timeout < 1000 * GPR_MS_PER_SEC) { + enc_millis(buffer, timeout); } else { - enc_seconds(buffer, timeout.tv_sec + (timeout.tv_nsec != 0)); + enc_seconds(buffer, + timeout / GPR_MS_PER_SEC + (timeout % GPR_MS_PER_SEC != 0)); } } @@ -121,8 +88,8 @@ static int is_all_whitespace(const char *p, const char *end) { return p == end; } -int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) { - int32_t x = 0; +int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout) { + grpc_millis x = 0; const uint8_t *p = GRPC_SLICE_START_PTR(text); const uint8_t *end = GRPC_SLICE_END_PTR(text); int have_digit = 0; @@ -136,7 +103,7 @@ int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) { /* spec allows max. 8 digits, but we allow values up to 1,000,000,000 */ if (x >= (100 * 1000 * 1000)) { if (x != (100 * 1000 * 1000) || digit != 0) { - *timeout = gpr_inf_future(GPR_TIMESPAN); + *timeout = GRPC_MILLIS_INF_FUTURE; return 1; } } @@ -150,22 +117,22 @@ int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout) { /* decode unit specifier */ switch (*p) { case 'n': - *timeout = gpr_time_from_nanos(x, GPR_TIMESPAN); + *timeout = x / GPR_NS_PER_MS + (x % GPR_NS_PER_MS != 0); break; case 'u': - *timeout = gpr_time_from_micros(x, GPR_TIMESPAN); + *timeout = x / GPR_US_PER_MS + (x % GPR_US_PER_MS != 0); break; case 'm': - *timeout = gpr_time_from_millis(x, GPR_TIMESPAN); + *timeout = x; break; case 'S': - *timeout = gpr_time_from_seconds(x, GPR_TIMESPAN); + *timeout = x * GPR_MS_PER_SEC; break; case 'M': - *timeout = gpr_time_from_minutes(x, GPR_TIMESPAN); + *timeout = x * 60 * GPR_MS_PER_SEC; break; case 'H': - *timeout = gpr_time_from_hours(x, GPR_TIMESPAN); + *timeout = x * 60 * 60 * GPR_MS_PER_SEC; break; default: return 0; diff --git a/src/core/lib/transport/timeout_encoding.h b/src/core/lib/transport/timeout_encoding.h index 1f4e206f8a..91cdf0f728 100644 --- a/src/core/lib/transport/timeout_encoding.h +++ b/src/core/lib/transport/timeout_encoding.h @@ -22,6 +22,7 @@ #include <grpc/slice.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/string.h" #define GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE (GPR_LTOA_MIN_BUFSIZE + 1) @@ -32,11 +33,11 @@ extern "C" { /* Encode/decode timeouts to the GRPC over HTTP/2 format; encoding may round up arbitrarily */ -void grpc_http2_encode_timeout(gpr_timespec timeout, char *buffer); -int grpc_http2_decode_timeout(grpc_slice text, gpr_timespec *timeout); +void grpc_http2_encode_timeout(grpc_millis timeout, char *buffer); +int grpc_http2_decode_timeout(grpc_slice text, grpc_millis *timeout); #ifdef __cplusplus } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_TIMEOUT_ENCODING_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_TIMEOUT_ENCODING_H */ diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 41d34d3954..445fb41ab1 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -77,4 +77,4 @@ struct grpc_transport { } #endif -#endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_IMPL_H */
\ No newline at end of file +#endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_IMPL_H */ diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index 87fdf72e29..cc11b0cc49 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -51,10 +51,9 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", ")); put_metadata(b, m->md); } - if (gpr_time_cmp(md.deadline, gpr_inf_future(md.deadline.clock_type)) != 0) { + if (md.deadline != GRPC_MILLIS_INF_FUTURE) { char *tmp; - gpr_asprintf(&tmp, " deadline=%" PRId64 ".%09d", md.deadline.tv_sec, - md.deadline.tv_nsec); + gpr_asprintf(&tmp, " deadline=%" PRIdPTR, md.deadline); gpr_strvec_add(b, tmp); } } |