diff options
Diffstat (limited to 'src')
41 files changed, 1269 insertions, 189 deletions
diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/server_load_reporting_filter.c index 17e946937f..7b8cf986f7 100644 --- a/src/core/ext/filters/load_reporting/load_reporting_filter.c +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.c @@ -24,8 +24,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> -#include "src/core/ext/filters/load_reporting/load_reporting.h" -#include "src/core/ext/filters/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -213,7 +213,7 @@ static void lr_start_transport_stream_op_batch( GPR_TIMER_END("lr_start_transport_stream_op_batch", 0); } -const grpc_channel_filter grpc_load_reporting_filter = { +const grpc_channel_filter grpc_server_load_reporting_filter = { lr_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h index 1a5424e43a..9527868c9f 100644 --- a/src/core/ext/filters/load_reporting/load_reporting_filter.h +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h @@ -16,12 +16,13 @@ * */ -#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H -#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H +#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H +#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H -#include "src/core/ext/filters/load_reporting/load_reporting.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_stack.h" -extern const grpc_channel_filter grpc_load_reporting_filter; +extern const grpc_channel_filter grpc_server_load_reporting_filter; -#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H */ +#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H \ + */ diff --git a/src/core/ext/filters/load_reporting/load_reporting.c b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c index 9745763c91..199cb883b3 100644 --- a/src/core/ext/filters/load_reporting/load_reporting.c +++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c @@ -25,8 +25,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/sync.h> -#include "src/core/ext/filters/load_reporting/load_reporting.h" -#include "src/core/ext/filters/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/call.h" @@ -37,14 +37,19 @@ static bool is_load_reporting_enabled(const grpc_channel_args *a) { grpc_channel_args_find(a, GRPC_ARG_ENABLE_LOAD_REPORTING), false); } -static bool maybe_add_load_reporting_filter(grpc_exec_ctx *exec_ctx, - grpc_channel_stack_builder *builder, - void *arg) { +static bool maybe_add_server_load_reporting_filter( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); - if (is_load_reporting_enabled(args)) { - return grpc_channel_stack_builder_prepend_filter( - builder, (const grpc_channel_filter *)arg, NULL, NULL); + const grpc_channel_filter *filter = arg; + grpc_channel_stack_builder_iterator *it = + grpc_channel_stack_builder_iterator_find(builder, filter->name); + const bool already_has_load_reporting_filter = + !grpc_channel_stack_builder_iterator_is_end(it); + grpc_channel_stack_builder_iterator_destroy(it); + if (is_load_reporting_enabled(args) && !already_has_load_reporting_filter) { + return grpc_channel_stack_builder_prepend_filter(builder, filter, NULL, + NULL); } return true; } @@ -55,10 +60,10 @@ grpc_arg grpc_load_reporting_enable_arg() { /* Plugin registration */ -void grpc_load_reporting_plugin_init(void) { +void grpc_server_load_reporting_plugin_init(void) { grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, - maybe_add_load_reporting_filter, - (void *)&grpc_load_reporting_filter); + maybe_add_server_load_reporting_filter, + (void *)&grpc_server_load_reporting_filter); } -void grpc_load_reporting_plugin_shutdown() {} +void grpc_server_load_reporting_plugin_shutdown() {} diff --git a/src/core/ext/filters/load_reporting/load_reporting.h b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h index fc04d2826a..65a6d0900e 100644 --- a/src/core/ext/filters/load_reporting/load_reporting.h +++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H -#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H +#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H +#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H #include <grpc/impl/codegen/grpc_types.h> @@ -55,4 +55,5 @@ typedef struct grpc_load_reporting_call_data { /** Return a \a grpc_arg enabling load reporting */ grpc_arg grpc_load_reporting_enable_arg(); -#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H */ +#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H \ + */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 2ed5562209..a0bfa1676c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -34,6 +34,7 @@ #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/compression/stream_compression.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer.h" @@ -1240,6 +1241,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_transport_stream_op_batch_payload *op_payload = op->payload; grpc_chttp2_transport *t = s->t; + GRPC_STATS_INC_HTTP2_OP_BATCHES(exec_ctx); + if (GRPC_TRACER_ON(grpc_http_trace)) { char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, @@ -1273,11 +1276,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->cancel_stream) { + GRPC_STATS_INC_HTTP2_OP_CANCEL(exec_ctx); grpc_chttp2_cancel_stream(exec_ctx, t, s, op_payload->cancel_stream.cancel_error); } if (op->send_initial_metadata) { + GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(exec_ctx); GPR_ASSERT(s->send_initial_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1358,6 +1363,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->send_message) { + GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(exec_ctx); + GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( + exec_ctx, op->payload->send_message.send_message->length); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { @@ -1402,6 +1410,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->send_trailing_metadata) { + GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA(exec_ctx); GPR_ASSERT(s->send_trailing_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); @@ -1451,6 +1460,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->recv_initial_metadata) { + GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA(exec_ctx); GPR_ASSERT(s->recv_initial_metadata_ready == NULL); s->recv_initial_metadata_ready = op_payload->recv_initial_metadata.recv_initial_metadata_ready; @@ -1466,6 +1476,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->recv_message) { + GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(exec_ctx); size_t already_received; GPR_ASSERT(s->recv_message_ready == NULL); GPR_ASSERT(!s->pending_byte_stream); @@ -1487,6 +1498,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->recv_trailing_metadata) { + GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(exec_ctx); GPR_ASSERT(s->recv_trailing_metadata_finished == NULL); s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); s->recv_trailing_metadata = diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index cffd6c6983..dea7a9ad5b 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -22,6 +22,7 @@ #include <grpc/support/log.h> +#include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/http2_errors.h" @@ -115,6 +116,7 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_ping_create(false, pq->inflight_id)); + GRPC_STATS_INC_HTTP2_PINGS_SENT(exec_ctx); t->ping_state.last_ping_sent_time = grpc_exec_ctx_now(exec_ctx); t->ping_state.pings_before_data_required -= (t->ping_state.pings_before_data_required != 0); @@ -161,6 +163,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; + GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx); + GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0); if (t->dirtied_local_settings && !t->sent_local_settings) { diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c index c369e33073..7f2b8e07ce 100644 --- a/src/core/lib/channel/channel_stack_builder.c +++ b/src/core/lib/channel/channel_stack_builder.c @@ -124,6 +124,20 @@ bool grpc_channel_stack_builder_move_prev( return true; } +grpc_channel_stack_builder_iterator *grpc_channel_stack_builder_iterator_find( + grpc_channel_stack_builder *builder, const char *filter_name) { + GPR_ASSERT(filter_name != NULL); + grpc_channel_stack_builder_iterator *it = + grpc_channel_stack_builder_create_iterator_at_first(builder); + while (grpc_channel_stack_builder_move_next(it)) { + if (grpc_channel_stack_builder_iterator_is_end(it)) break; + const char *filter_name_at_it = + grpc_channel_stack_builder_iterator_filter_name(it); + if (strcmp(filter_name, filter_name_at_it) == 0) break; + } + return it; +} + bool grpc_channel_stack_builder_move_prev( grpc_channel_stack_builder_iterator *iterator); @@ -169,6 +183,21 @@ bool grpc_channel_stack_builder_append_filter( return ok; } +bool grpc_channel_stack_builder_remove_filter( + grpc_channel_stack_builder *builder, const char *filter_name) { + grpc_channel_stack_builder_iterator *it = + grpc_channel_stack_builder_iterator_find(builder, filter_name); + if (grpc_channel_stack_builder_iterator_is_end(it)) { + grpc_channel_stack_builder_iterator_destroy(it); + return false; + } + it->node->prev->next = it->node->next; + it->node->next->prev = it->node->prev; + gpr_free(it->node); + grpc_channel_stack_builder_iterator_destroy(it); + return true; +} + bool grpc_channel_stack_builder_prepend_filter( grpc_channel_stack_builder *builder, const grpc_channel_filter *filter, grpc_post_filter_create_init_func post_init_func, void *user_data) { diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index d43e427962..fdff2a2b6d 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -95,6 +95,11 @@ bool grpc_channel_stack_builder_move_next( bool grpc_channel_stack_builder_move_prev( grpc_channel_stack_builder_iterator *iterator); +/// Return an iterator at \a filter_name, or at the end of the list if not +/// found. +grpc_channel_stack_builder_iterator *grpc_channel_stack_builder_iterator_find( + grpc_channel_stack_builder *builder, const char *filter_name); + typedef void (*grpc_post_filter_create_init_func)( grpc_channel_stack *channel_stack, grpc_channel_element *elem, void *arg); @@ -132,6 +137,11 @@ bool grpc_channel_stack_builder_append_filter( grpc_post_filter_create_init_func post_init_func, void *user_data) GRPC_MUST_USE_RESULT; +/// Remove any filter whose name is \a filter_name from \a builder. Returns true +/// if \a filter_name was not found. +bool grpc_channel_stack_builder_remove_filter( + grpc_channel_stack_builder *builder, const char *filter_name); + /// Terminate iteration and destroy \a iterator void grpc_channel_stack_builder_iterator_destroy( grpc_channel_stack_builder_iterator *iterator); diff --git a/src/core/lib/debug/stats.c b/src/core/lib/debug/stats.c index 4dbd94c724..91ca0aa76e 100644 --- a/src/core/lib/debug/stats.c +++ b/src/core/lib/debug/stats.c @@ -45,7 +45,95 @@ void grpc_stats_collect(grpc_stats_data *output) { output->counters[i] += gpr_atm_no_barrier_load( &grpc_stats_per_cpu_storage[core].counters[i]); } + for (size_t i = 0; i < GRPC_STATS_HISTOGRAM_BUCKETS; i++) { + output->histograms[i] += gpr_atm_no_barrier_load( + &grpc_stats_per_cpu_storage[core].histograms[i]); + } + } +} + +void grpc_stats_diff(const grpc_stats_data *b, const grpc_stats_data *a, + grpc_stats_data *c) { + for (size_t i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + c->counters[i] = b->counters[i] - a->counters[i]; + } + for (size_t i = 0; i < GRPC_STATS_HISTOGRAM_BUCKETS; i++) { + c->histograms[i] = b->histograms[i] - a->histograms[i]; + } +} + +int grpc_stats_histo_find_bucket_slow(grpc_exec_ctx *exec_ctx, int value, + const int *table, int table_size) { + GRPC_STATS_INC_HISTOGRAM_SLOW_LOOKUPS(exec_ctx); + const int *const start = table; + while (table_size > 0) { + int step = table_size / 2; + const int *it = table + step; + if (value >= *it) { + table = it + 1; + table_size -= step + 1; + } else { + table_size = step; + } + } + return (int)(table - start) - 1; +} + +size_t grpc_stats_histo_count(const grpc_stats_data *stats, + grpc_stats_histograms histogram) { + size_t sum = 0; + for (int i = 0; i < grpc_stats_histo_buckets[histogram]; i++) { + sum += (size_t)stats->histograms[grpc_stats_histo_start[histogram] + i]; + } + return sum; +} + +static double threshold_for_count_below(const gpr_atm *bucket_counts, + const int *bucket_boundaries, + int num_buckets, double count_below) { + double count_so_far; + double lower_bound; + double upper_bound; + int lower_idx; + int upper_idx; + + /* find the lowest bucket that gets us above count_below */ + count_so_far = 0.0; + for (lower_idx = 0; lower_idx < num_buckets; lower_idx++) { + count_so_far += (double)bucket_counts[lower_idx]; + if (count_so_far >= count_below) { + break; + } } + if (count_so_far == count_below) { + /* this bucket hits the threshold exactly... we should be midway through + any run of zero values following the bucket */ + for (upper_idx = lower_idx + 1; upper_idx < num_buckets; upper_idx++) { + if (bucket_counts[upper_idx]) { + break; + } + } + return (bucket_boundaries[lower_idx] + bucket_boundaries[upper_idx]) / 2.0; + } else { + /* treat values as uniform throughout the bucket, and find where this value + should lie */ + lower_bound = bucket_boundaries[lower_idx]; + upper_bound = bucket_boundaries[lower_idx + 1]; + return upper_bound - + (upper_bound - lower_bound) * (count_so_far - count_below) / + (double)bucket_counts[lower_idx]; + } +} + +double grpc_stats_histo_percentile(const grpc_stats_data *stats, + grpc_stats_histograms histogram, + double percentile) { + size_t count = grpc_stats_histo_count(stats, histogram); + if (count == 0) return 0.0; + return threshold_for_count_below( + stats->histograms + grpc_stats_histo_start[histogram], + grpc_stats_histo_bucket_boundaries[histogram], + grpc_stats_histo_buckets[histogram], (double)count * percentile / 100.0); } char *grpc_stats_data_as_json(const grpc_stats_data *data) { @@ -60,6 +148,25 @@ char *grpc_stats_data_as_json(const grpc_stats_data *data) { gpr_strvec_add(&v, tmp); is_first = false; } + for (size_t i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + gpr_asprintf(&tmp, "%s\"%s\": [", is_first ? "" : ", ", + grpc_stats_histogram_name[i]); + gpr_strvec_add(&v, tmp); + for (int j = 0; j < grpc_stats_histo_buckets[i]; j++) { + gpr_asprintf(&tmp, "%s%" PRIdPTR, j == 0 ? "" : ",", + data->histograms[grpc_stats_histo_start[i] + j]); + gpr_strvec_add(&v, tmp); + } + gpr_asprintf(&tmp, "], \"%s_bkt\": [", grpc_stats_histogram_name[i]); + gpr_strvec_add(&v, tmp); + for (int j = 0; j < grpc_stats_histo_buckets[i]; j++) { + gpr_asprintf(&tmp, "%s%d", j == 0 ? "" : ",", + grpc_stats_histo_bucket_boundaries[i][j]); + gpr_strvec_add(&v, tmp); + } + gpr_strvec_add(&v, gpr_strdup("]")); + is_first = false; + } gpr_strvec_add(&v, gpr_strdup("}")); tmp = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); diff --git a/src/core/lib/debug/stats.h b/src/core/lib/debug/stats.h index 563b108dff..09d190d488 100644 --- a/src/core/lib/debug/stats.h +++ b/src/core/lib/debug/stats.h @@ -25,6 +25,7 @@ typedef struct grpc_stats_data { gpr_atm counters[GRPC_STATS_COUNTER_COUNT]; + gpr_atm histograms[GRPC_STATS_HISTOGRAM_BUCKETS]; } grpc_stats_data; extern grpc_stats_data *grpc_stats_per_cpu_storage; @@ -36,9 +37,25 @@ extern grpc_stats_data *grpc_stats_per_cpu_storage; (gpr_atm_no_barrier_fetch_add( \ &GRPC_THREAD_STATS_DATA((exec_ctx))->counters[(ctr)], 1)) +#define GRPC_STATS_INC_HISTOGRAM(exec_ctx, histogram, index) \ + (gpr_atm_no_barrier_fetch_add( \ + &GRPC_THREAD_STATS_DATA((exec_ctx)) \ + ->histograms[histogram##_FIRST_SLOT + (index)], \ + 1)) + void grpc_stats_init(void); void grpc_stats_shutdown(void); void grpc_stats_collect(grpc_stats_data *output); +// c = b-a +void grpc_stats_diff(const grpc_stats_data *b, const grpc_stats_data *a, + grpc_stats_data *c); char *grpc_stats_data_as_json(const grpc_stats_data *data); +int grpc_stats_histo_find_bucket_slow(grpc_exec_ctx *exec_ctx, int value, + const int *table, int table_size); +double grpc_stats_histo_percentile(const grpc_stats_data *data, + grpc_stats_histograms histogram, + double percentile); +size_t grpc_stats_histo_count(const grpc_stats_data *data, + grpc_stats_histograms histogram); #endif diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index 2203358a7e..15ccaf21c4 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -19,7 +19,269 @@ */ #include "src/core/lib/debug/stats_data.h" +#include <grpc/support/useful.h> +#include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/exec_ctx.h" const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { - "client_calls_created", "server_calls_created", "syscall_write", - "syscall_read", "syscall_poll", "syscall_wait", + "client_calls_created", + "server_calls_created", + "syscall_poll", + "syscall_wait", + "histogram_slow_lookups", + "syscall_write", + "syscall_read", + "http2_op_batches", + "http2_op_cancel", + "http2_op_send_initial_metadata", + "http2_op_send_message", + "http2_op_send_trailing_metadata", + "http2_op_recv_initial_metadata", + "http2_op_recv_message", + "http2_op_recv_trailing_metadata", + "http2_pings_sent", + "http2_writes_begun", + "combiner_locks_initiated", + "combiner_locks_scheduled_items", + "combiner_locks_scheduled_final_items", + "combiner_locks_offloaded", + "executor_scheduled_items", + "executor_scheduled_to_self", + "executor_wakeup_initiated", + "executor_queue_drained", }; +const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { + "Number of client side calls created by this process", + "Number of server side calls created by this process", + "Number of polling syscalls (epoll_wait, poll, etc) made by this process", + "Number of sleeping syscalls made by this process", + "Number of times histogram increments went through the slow (binary " + "search) path", + "Number of write syscalls (or equivalent - eg sendmsg) made by this " + "process", + "Number of read syscalls (or equivalent - eg recvmsg) made by this process", + "Number of batches received by HTTP2 transport", + "Number of cancelations received by HTTP2 transport", + "Number of batches containing send initial metadata", + "Number of batches containing send message", + "Number of batches containing send trailing metadata", + "Number of batches containing receive initial metadata", + "Number of batches containing receive message", + "Number of batches containing receive trailing metadata", + "Number of HTTP2 pings sent by process", "Number of HTTP2 writes initiated", + "Number of combiner lock entries by process (first items queued to a " + "combiner)", + "Number of items scheduled against combiner locks", + "Number of final items scheduled against combiner locks", + "Number of combiner locks offloaded to different threads", + "Number of closures scheduled against the executor (gRPC thread pool)", + "Number of closures scheduled by the executor to the executor", + "Number of thread wakeups initiated within the executor", + "Number of times an executor queue was drained", +}; +const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { + "tcp_write_size", "tcp_write_iov_size", "tcp_read_size", + "tcp_read_offer", "tcp_read_offer_iov_size", "http2_send_message_size", +}; +const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { + "Number of bytes offered to each syscall_write", + "Number of byte segments offered to each syscall_write", + "Number of bytes received by each syscall_read", + "Number of bytes offered to each syscall_read", + "Number of byte segments offered to each syscall_read", + "Size of messages received by HTTP2 transport", +}; +const int grpc_stats_table_0[65] = { + 0, 1, 2, 3, 4, 6, 8, 11, + 15, 20, 26, 34, 44, 57, 73, 94, + 121, 155, 199, 255, 327, 419, 537, 688, + 881, 1128, 1444, 1848, 2365, 3026, 3872, 4954, + 6338, 8108, 10373, 13270, 16976, 21717, 27782, 35541, + 45467, 58165, 74409, 95189, 121772, 155778, 199281, 254933, + 326126, 417200, 533707, 682750, 873414, 1117323, 1429345, 1828502, + 2339127, 2992348, 3827987, 4896985, 6264509, 8013925, 10251880, 13114801, + 16777216}; +const uint8_t grpc_stats_table_1[87] = { + 0, 0, 1, 1, 2, 3, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 11, + 11, 12, 13, 13, 14, 15, 15, 16, 17, 17, 18, 19, 20, 20, 21, 22, 22, 23, + 24, 25, 25, 26, 27, 27, 28, 29, 29, 30, 31, 31, 32, 33, 34, 34, 35, 36, + 36, 37, 38, 39, 39, 40, 41, 41, 42, 43, 44, 44, 45, 45, 46, 47, 48, 48, + 49, 50, 51, 51, 52, 53, 53, 54, 55, 56, 56, 57, 58, 58, 59}; +const int grpc_stats_table_2[65] = { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, + 14, 16, 18, 20, 22, 24, 27, 30, 33, 36, 39, 43, 47, + 51, 56, 61, 66, 72, 78, 85, 92, 100, 109, 118, 128, 139, + 151, 164, 178, 193, 209, 226, 244, 264, 285, 308, 333, 359, 387, + 418, 451, 486, 524, 565, 609, 656, 707, 762, 821, 884, 952, 1024}; +const uint8_t grpc_stats_table_3[102] = { + 0, 0, 0, 1, 1, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, + 6, 7, 7, 7, 8, 8, 9, 9, 10, 11, 11, 12, 12, 13, 13, 14, 14, + 14, 15, 15, 16, 16, 17, 17, 18, 19, 19, 20, 20, 21, 21, 22, 22, 23, + 23, 24, 24, 24, 25, 26, 27, 27, 28, 28, 29, 29, 30, 30, 31, 31, 32, + 32, 33, 33, 34, 35, 35, 36, 37, 37, 38, 38, 39, 39, 40, 40, 41, 41, + 42, 42, 43, 44, 44, 45, 46, 46, 47, 48, 48, 49, 49, 50, 50, 51, 51}; +void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) { + value = GPR_CLAMP(value, 0, 16777216); + if (value < 5) { + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE, + value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4683743612465315840ull) { + int bucket = + grpc_stats_table_1[((_val.uint - 4617315517961601024ull) >> 50)] + 5; + _bkt.dbl = grpc_stats_table_0[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE, + bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_0, 64)); +} +void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) { + value = GPR_CLAMP(value, 0, 1024); + if (value < 13) { + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE, value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4637863191261478912ull) { + int bucket = + grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13; + _bkt.dbl = grpc_stats_table_2[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE, bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_2, 64)); +} +void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int value) { + value = GPR_CLAMP(value, 0, 16777216); + if (value < 5) { + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_SIZE, + value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4683743612465315840ull) { + int bucket = + grpc_stats_table_1[((_val.uint - 4617315517961601024ull) >> 50)] + 5; + _bkt.dbl = grpc_stats_table_0[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_SIZE, + bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_SIZE, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_0, 64)); +} +void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) { + value = GPR_CLAMP(value, 0, 16777216); + if (value < 5) { + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, + value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4683743612465315840ull) { + int bucket = + grpc_stats_table_1[((_val.uint - 4617315517961601024ull) >> 50)] + 5; + _bkt.dbl = grpc_stats_table_0[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, + bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_0, 64)); +} +void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, + int value) { + value = GPR_CLAMP(value, 0, 1024); + if (value < 13) { + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4637863191261478912ull) { + int bucket = + grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13; + _bkt.dbl = grpc_stats_table_2[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_2, 64)); +} +void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, + int value) { + value = GPR_CLAMP(value, 0, 16777216); + if (value < 5) { + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE, value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4683743612465315840ull) { + int bucket = + grpc_stats_table_1[((_val.uint - 4617315517961601024ull) >> 50)] + 5; + _bkt.dbl = grpc_stats_table_0[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE, bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_0, 64)); +} +const int grpc_stats_histo_buckets[6] = {64, 64, 64, 64, 64, 64}; +const int grpc_stats_histo_start[6] = {0, 64, 128, 192, 256, 320}; +const int *const grpc_stats_histo_bucket_boundaries[6] = { + grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0, + grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0}; +void (*const grpc_stats_inc_histogram[6])(grpc_exec_ctx *exec_ctx, int x) = { + grpc_stats_inc_tcp_write_size, + grpc_stats_inc_tcp_write_iov_size, + grpc_stats_inc_tcp_read_size, + grpc_stats_inc_tcp_read_offer, + grpc_stats_inc_tcp_read_offer_iov_size, + grpc_stats_inc_http2_send_message_size}; diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index c9c2f65c30..3151e5ab5c 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -21,27 +21,148 @@ #ifndef GRPC_CORE_LIB_DEBUG_STATS_DATA_H #define GRPC_CORE_LIB_DEBUG_STATS_DATA_H +#include <inttypes.h> +#include "src/core/lib/iomgr/exec_ctx.h" + typedef enum { GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED, GRPC_STATS_COUNTER_SERVER_CALLS_CREATED, - GRPC_STATS_COUNTER_SYSCALL_WRITE, - GRPC_STATS_COUNTER_SYSCALL_READ, GRPC_STATS_COUNTER_SYSCALL_POLL, GRPC_STATS_COUNTER_SYSCALL_WAIT, + GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS, + GRPC_STATS_COUNTER_SYSCALL_WRITE, + GRPC_STATS_COUNTER_SYSCALL_READ, + GRPC_STATS_COUNTER_HTTP2_OP_BATCHES, + GRPC_STATS_COUNTER_HTTP2_OP_CANCEL, + GRPC_STATS_COUNTER_HTTP2_OP_SEND_INITIAL_METADATA, + GRPC_STATS_COUNTER_HTTP2_OP_SEND_MESSAGE, + GRPC_STATS_COUNTER_HTTP2_OP_SEND_TRAILING_METADATA, + GRPC_STATS_COUNTER_HTTP2_OP_RECV_INITIAL_METADATA, + GRPC_STATS_COUNTER_HTTP2_OP_RECV_MESSAGE, + GRPC_STATS_COUNTER_HTTP2_OP_RECV_TRAILING_METADATA, + GRPC_STATS_COUNTER_HTTP2_PINGS_SENT, + GRPC_STATS_COUNTER_HTTP2_WRITES_BEGUN, + GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED, + GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS, + GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS, + GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED, + GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_ITEMS, + GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF, + GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED, + GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED, GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; +extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; +extern const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT]; +typedef enum { + GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE, + GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE, + GRPC_STATS_HISTOGRAM_TCP_READ_SIZE, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE, + GRPC_STATS_HISTOGRAM_COUNT +} grpc_stats_histograms; +extern const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT]; +extern const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT]; +typedef enum { + GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE_FIRST_SLOT = 0, + GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE_FIRST_SLOT = 64, + GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_TCP_READ_SIZE_FIRST_SLOT = 128, + GRPC_STATS_HISTOGRAM_TCP_READ_SIZE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_FIRST_SLOT = 192, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_FIRST_SLOT = 256, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_FIRST_SLOT = 320, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_BUCKETS = 384 +} grpc_stats_histogram_constants; #define GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED) #define GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_CALLS_CREATED) -#define GRPC_STATS_INC_SYSCALL_WRITE(exec_ctx) \ - GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_WRITE) -#define GRPC_STATS_INC_SYSCALL_READ(exec_ctx) \ - GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_READ) #define GRPC_STATS_INC_SYSCALL_POLL(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_POLL) #define GRPC_STATS_INC_SYSCALL_WAIT(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_WAIT) -extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; +#define GRPC_STATS_INC_HISTOGRAM_SLOW_LOOKUPS(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS) +#define GRPC_STATS_INC_SYSCALL_WRITE(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_WRITE) +#define GRPC_STATS_INC_SYSCALL_READ(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_READ) +#define GRPC_STATS_INC_HTTP2_OP_BATCHES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_OP_BATCHES) +#define GRPC_STATS_INC_HTTP2_OP_CANCEL(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_OP_CANCEL) +#define GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_OP_SEND_INITIAL_METADATA) +#define GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_OP_SEND_MESSAGE) +#define GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_OP_SEND_TRAILING_METADATA) +#define GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_OP_RECV_INITIAL_METADATA) +#define GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_OP_RECV_MESSAGE) +#define GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_OP_RECV_TRAILING_METADATA) +#define GRPC_STATS_INC_HTTP2_PINGS_SENT(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PINGS_SENT) +#define GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_BEGUN) +#define GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED) +#define GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS) +#define GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS) +#define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED) +#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_ITEMS) +#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF) +#define GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED) +#define GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED) +#define GRPC_STATS_INC_TCP_WRITE_SIZE(exec_ctx, value) \ + grpc_stats_inc_tcp_write_size((exec_ctx), (int)(value)) +void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(exec_ctx, value) \ + grpc_stats_inc_tcp_write_iov_size((exec_ctx), (int)(value)) +void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_TCP_READ_SIZE(exec_ctx, value) \ + grpc_stats_inc_tcp_read_size((exec_ctx), (int)(value)) +void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, value) \ + grpc_stats_inc_tcp_read_offer((exec_ctx), (int)(value)) +void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, value) \ + grpc_stats_inc_tcp_read_offer_iov_size((exec_ctx), (int)(value)) +void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(exec_ctx, value) \ + grpc_stats_inc_http2_send_message_size((exec_ctx), (int)(value)) +void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int x); +extern const int grpc_stats_histo_buckets[6]; +extern const int grpc_stats_histo_start[6]; +extern const int *const grpc_stats_histo_bucket_boundaries[6]; +extern void (*const grpc_stats_inc_histogram[6])(grpc_exec_ctx *exec_ctx, + int x); #endif /* GRPC_CORE_LIB_DEBUG_STATS_DATA_H */ diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 8afe48f5cd..53f6ff0074 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -1,9 +1,100 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # Stats data declaration -# use tools/codegen/core/gen_stats_data.py to turn this into stats_data.h +# use tools / codegen / core / gen_stats_data.py to turn this into stats_data.h +# overall - counter: client_calls_created + doc: Number of client side calls created by this process - counter: server_calls_created -- counter: syscall_write -- counter: syscall_read + doc: Number of server side calls created by this process +# polling - counter: syscall_poll + doc: Number of polling syscalls (epoll_wait, poll, etc) made by this process - counter: syscall_wait + doc: Number of sleeping syscalls made by this process +# stats system +- counter: histogram_slow_lookups + doc: Number of times histogram increments went through the slow + (binary search) path +# tcp +- counter: syscall_write + doc: Number of write syscalls (or equivalent - eg sendmsg) made by this process +- counter: syscall_read + doc: Number of read syscalls (or equivalent - eg recvmsg) made by this process +- histogram: tcp_write_size + max: 16777216 # 16 meg max write tracked + buckets: 64 + doc: Number of bytes offered to each syscall_write +- histogram: tcp_write_iov_size + max: 1024 + buckets: 64 + doc: Number of byte segments offered to each syscall_write +- histogram: tcp_read_size + max: 16777216 + buckets: 64 + doc: Number of bytes received by each syscall_read +- histogram: tcp_read_offer + max: 16777216 + buckets: 64 + doc: Number of bytes offered to each syscall_read +- histogram: tcp_read_offer_iov_size + max: 1024 + buckets: 64 + doc: Number of byte segments offered to each syscall_read +# chttp2 +- counter: http2_op_batches + doc: Number of batches received by HTTP2 transport +- counter: http2_op_cancel + doc: Number of cancelations received by HTTP2 transport +- counter: http2_op_send_initial_metadata + doc: Number of batches containing send initial metadata +- counter: http2_op_send_message + doc: Number of batches containing send message +- counter: http2_op_send_trailing_metadata + doc: Number of batches containing send trailing metadata +- counter: http2_op_recv_initial_metadata + doc: Number of batches containing receive initial metadata +- counter: http2_op_recv_message + doc: Number of batches containing receive message +- counter: http2_op_recv_trailing_metadata + doc: Number of batches containing receive trailing metadata +- histogram: http2_send_message_size + max: 16777216 + buckets: 64 + doc: Size of messages received by HTTP2 transport +- counter: http2_pings_sent + doc: Number of HTTP2 pings sent by process +- counter: http2_writes_begun + doc: Number of HTTP2 writes initiated +# combiner locks +- counter: combiner_locks_initiated + doc: Number of combiner lock entries by process + (first items queued to a combiner) +- counter: combiner_locks_scheduled_items + doc: Number of items scheduled against combiner locks +- counter: combiner_locks_scheduled_final_items + doc: Number of final items scheduled against combiner locks +- counter: combiner_locks_offloaded + doc: Number of combiner locks offloaded to different threads +# executor +- counter: executor_scheduled_items + doc: Number of closures scheduled against the executor (gRPC thread pool) +- counter: executor_scheduled_to_self + doc: Number of closures scheduled by the executor to the executor +- counter: executor_wakeup_initiated + doc: Number of thread wakeups initiated within the executor +- counter: executor_queue_drained + doc: Number of times an executor queue was drained diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 9b66987b68..4c1503bddb 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -24,6 +24,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" @@ -153,6 +154,7 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, grpc_error *error) { + GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx); GPR_TIMER_BEGIN("combiner.execute", 0); grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); @@ -160,6 +162,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, lock, cl, last)); if (last == 1) { + GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx); gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, (gpr_atm)exec_ctx); // first element on this list: add it to the list of combiner locks @@ -195,6 +198,7 @@ static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx); move_next(exec_ctx); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock)); GRPC_CLOSURE_SCHED(exec_ctx, &lock->offload, GRPC_ERROR_NONE); @@ -325,6 +329,7 @@ static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error) { + GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(exec_ctx); grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 7621a7fe75..dd5cb2a64e 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -28,6 +28,7 @@ #include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/spinlock.h" @@ -145,6 +146,7 @@ static void executor_thread(void *arg) { gpr_mu_unlock(&ts->mu); break; } + GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx); grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); @@ -158,6 +160,7 @@ static void executor_thread(void *arg) { static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error) { size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx); if (cur_thread_count == 0) { grpc_closure_list_append(&exec_ctx->closure_list, closure, error); return; @@ -165,9 +168,12 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); if (ts == NULL) { ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + } else { + GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); } gpr_mu_lock(&ts->mu); if (grpc_closure_list_empty(ts->elems)) { + GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx); gpr_cv_signal(&ts->cv); } grpc_closure_list_append(&ts->elems, closure, error); diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 6c620ca245..3372e14eef 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -255,6 +255,9 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; + GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, tcp->incoming_buffer->length); + GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count); + GPR_TIMER_BEGIN("recvmsg", 0); do { GRPC_STATS_INC_SYSCALL_READ(exec_ctx); @@ -285,6 +288,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { + GRPC_STATS_INC_TCP_READ_SIZE(exec_ctx, read_bytes); add_to_estimate(tcp, (size_t)read_bytes); GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { @@ -401,6 +405,9 @@ static bool tcp_flush(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, msg.msg_controllen = 0; msg.msg_flags = 0; + GRPC_STATS_INC_TCP_WRITE_SIZE(exec_ctx, sending_length); + GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(exec_ctx, iov_size); + GPR_TIMER_BEGIN("sendmsg", 1); do { /* TODO(klempner): Cork if this is a partial write */ diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index 89c997bdca..e51cb44fbc 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -43,6 +43,10 @@ typedef struct grpc_timer grpc_timer; void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_millis deadline, grpc_closure *closure); +/* Initialize *timer without setting it. This can later be passed through + the regular init or cancel */ +void grpc_timer_init_unset(grpc_timer *timer); + /* Note that there is no timer destroy function. This is because the timer is a one-time occurrence with a guarantee that the callback will be called exactly once, either at expiration or cancellation. Thus, all diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 8ed0362041..c69084d680 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -194,6 +194,8 @@ static void note_deadline_change(timer_shard *shard) { } } +void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; } + void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_millis deadline, grpc_closure *closure) { int is_first_timer = 0; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 70f49bcbe8..adced41f53 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -77,6 +77,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_unref((uv_handle_t *)uv_timer); } +void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = 0; } + void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { GRPC_UV_ASSERT_SAME_THREAD(); if (timer->pending) { diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 5e41b94ff8..ae5633b82c 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -34,7 +34,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" -#include "src/core/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_grpc.h" #define STAGING_BUFFER_SIZE 8192 @@ -42,6 +42,7 @@ typedef struct { grpc_endpoint base; grpc_endpoint *wrapped_ep; struct tsi_frame_protector *protector; + struct tsi_zero_copy_grpc_protector *zero_copy_protector; gpr_mu protector_mu; /* saved upper level callbacks and user_data. */ grpc_closure *read_cb; @@ -67,6 +68,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) { secure_endpoint *ep = secure_ep; grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep); tsi_frame_protector_destroy(ep->protector); + tsi_zero_copy_grpc_protector_destroy(exec_ctx, ep->zero_copy_protector); grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes); grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer); grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer); @@ -159,51 +161,58 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, return; } - /* TODO(yangg) check error, maybe bail out early */ - for (i = 0; i < ep->source_buffer.count; i++) { - grpc_slice encrypted = ep->source_buffer.slices[i]; - uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted); - size_t message_size = GRPC_SLICE_LENGTH(encrypted); - - while (message_size > 0 || keep_looping) { - size_t unprotected_buffer_size_written = (size_t)(end - cur); - size_t processed_message_size = message_size; - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_unprotect(ep->protector, message_bytes, - &processed_message_size, cur, - &unprotected_buffer_size_written); - gpr_mu_unlock(&ep->protector_mu); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Decryption error: %s", - tsi_result_to_string(result)); - break; - } - message_bytes += processed_message_size; - message_size -= processed_message_size; - cur += unprotected_buffer_size_written; - - if (cur == end) { - flush_read_staging_buffer(ep, &cur, &end); - /* Force to enter the loop again to extract buffered bytes in protector. - The bytes could be buffered because of running out of staging_buffer. - If this happens at the end of all slices, doing another unprotect - avoids leaving data in the protector. */ - keep_looping = 1; - } else if (unprotected_buffer_size_written > 0) { - keep_looping = 1; - } else { - keep_looping = 0; + if (ep->zero_copy_protector != NULL) { + // Use zero-copy grpc protector to unprotect. + result = tsi_zero_copy_grpc_protector_unprotect( + exec_ctx, ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer); + } else { + // Use frame protector to unprotect. + /* TODO(yangg) check error, maybe bail out early */ + for (i = 0; i < ep->source_buffer.count; i++) { + grpc_slice encrypted = ep->source_buffer.slices[i]; + uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted); + size_t message_size = GRPC_SLICE_LENGTH(encrypted); + + while (message_size > 0 || keep_looping) { + size_t unprotected_buffer_size_written = (size_t)(end - cur); + size_t processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_unprotect( + ep->protector, message_bytes, &processed_message_size, cur, + &unprotected_buffer_size_written); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Decryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += unprotected_buffer_size_written; + + if (cur == end) { + flush_read_staging_buffer(ep, &cur, &end); + /* Force to enter the loop again to extract buffered bytes in + protector. The bytes could be buffered because of running out of + staging_buffer. If this happens at the end of all slices, doing + another unprotect avoids leaving data in the protector. */ + keep_looping = 1; + } else if (unprotected_buffer_size_written > 0) { + keep_looping = 1; + } else { + keep_looping = 0; + } } + if (result != TSI_OK) break; } - if (result != TSI_OK) break; - } - if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) { - grpc_slice_buffer_add( - ep->read_buffer, - grpc_slice_split_head( - &ep->read_staging_buffer, - (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer)))); + if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) { + grpc_slice_buffer_add( + ep->read_buffer, + grpc_slice_split_head( + &ep->read_staging_buffer, + (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer)))); + } } /* TODO(yangg) experiment with moving this block after read_cb to see if it @@ -270,54 +279,62 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, } } - for (i = 0; i < slices->count; i++) { - grpc_slice plain = slices->slices[i]; - uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain); - size_t message_size = GRPC_SLICE_LENGTH(plain); - while (message_size > 0) { - size_t protected_buffer_size_to_send = (size_t)(end - cur); - size_t processed_message_size = message_size; - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_protect(ep->protector, message_bytes, - &processed_message_size, cur, - &protected_buffer_size_to_send); - gpr_mu_unlock(&ep->protector_mu); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Encryption error: %s", - tsi_result_to_string(result)); - break; - } - message_bytes += processed_message_size; - message_size -= processed_message_size; - cur += protected_buffer_size_to_send; - - if (cur == end) { - flush_write_staging_buffer(ep, &cur, &end); + if (ep->zero_copy_protector != NULL) { + // Use zero-copy grpc protector to protect. + result = tsi_zero_copy_grpc_protector_protect( + exec_ctx, ep->zero_copy_protector, slices, &ep->output_buffer); + } else { + // Use frame protector to protect. + for (i = 0; i < slices->count; i++) { + grpc_slice plain = slices->slices[i]; + uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain); + size_t message_size = GRPC_SLICE_LENGTH(plain); + while (message_size > 0) { + size_t protected_buffer_size_to_send = (size_t)(end - cur); + size_t processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect(ep->protector, message_bytes, + &processed_message_size, cur, + &protected_buffer_size_to_send); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Encryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += protected_buffer_size_to_send; + + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } } - } - if (result != TSI_OK) break; - } - if (result == TSI_OK) { - size_t still_pending_size; - do { - size_t protected_buffer_size_to_send = (size_t)(end - cur); - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_protect_flush(ep->protector, cur, - &protected_buffer_size_to_send, - &still_pending_size); - gpr_mu_unlock(&ep->protector_mu); if (result != TSI_OK) break; - cur += protected_buffer_size_to_send; - if (cur == end) { - flush_write_staging_buffer(ep, &cur, &end); + } + if (result == TSI_OK) { + size_t still_pending_size; + do { + size_t protected_buffer_size_to_send = (size_t)(end - cur); + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect_flush( + ep->protector, cur, &protected_buffer_size_to_send, + &still_pending_size); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) break; + cur += protected_buffer_size_to_send; + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } + } while (still_pending_size > 0); + if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) { + grpc_slice_buffer_add( + &ep->output_buffer, + grpc_slice_split_head( + &ep->write_staging_buffer, + (size_t)(cur - + GRPC_SLICE_START_PTR(ep->write_staging_buffer)))); } - } while (still_pending_size > 0); - if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) { - grpc_slice_buffer_add( - &ep->output_buffer, - grpc_slice_split_head( - &ep->write_staging_buffer, - (size_t)(cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer)))); } } @@ -389,13 +406,16 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_get_fd}; grpc_endpoint *grpc_secure_endpoint_create( - struct tsi_frame_protector *protector, grpc_endpoint *transport, - grpc_slice *leftover_slices, size_t leftover_nslices) { + struct tsi_frame_protector *protector, + struct tsi_zero_copy_grpc_protector *zero_copy_protector, + grpc_endpoint *transport, grpc_slice *leftover_slices, + size_t leftover_nslices) { size_t i; secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint)); ep->base.vtable = &vtable; ep->wrapped_ep = transport; ep->protector = protector; + ep->zero_copy_protector = zero_copy_protector; grpc_slice_buffer_init(&ep->leftover_bytes); for (i = 0; i < leftover_nslices; i++) { grpc_slice_buffer_add(&ep->leftover_bytes, diff --git a/src/core/lib/security/transport/secure_endpoint.h b/src/core/lib/security/transport/secure_endpoint.h index 1c5555f3df..3323a6ff42 100644 --- a/src/core/lib/security/transport/secure_endpoint.h +++ b/src/core/lib/security/transport/secure_endpoint.h @@ -23,12 +23,17 @@ #include "src/core/lib/iomgr/endpoint.h" struct tsi_frame_protector; +struct tsi_zero_copy_grpc_protector; extern grpc_tracer_flag grpc_trace_secure_endpoint; -/* Takes ownership of protector and to_wrap, and refs leftover_slices. */ +/* Takes ownership of protector, zero_copy_protector, and to_wrap, and refs + * leftover_slices. If zero_copy_protector is not NULL, protector will never be + * used. */ grpc_endpoint *grpc_secure_endpoint_create( - struct tsi_frame_protector *protector, grpc_endpoint *to_wrap, - grpc_slice *leftover_slices, size_t leftover_nslices); + struct tsi_frame_protector *protector, + struct tsi_zero_copy_grpc_protector *zero_copy_protector, + grpc_endpoint *to_wrap, grpc_slice *leftover_slices, + size_t leftover_nslices); #endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURE_ENDPOINT_H */ diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index fc9c9f980f..ea9608f444 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -32,6 +32,7 @@ #include "src/core/lib/security/transport/secure_endpoint.h" #include "src/core/lib/security/transport/tsi_error.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/tsi/transport_security_grpc.h" #define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256 @@ -135,17 +136,31 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); goto done; } - // Create frame protector. - tsi_frame_protector *protector; - tsi_result result = tsi_handshaker_result_create_frame_protector( - h->handshaker_result, NULL, &protector); - if (result != TSI_OK) { + // Create zero-copy frame protector, if implemented. + tsi_zero_copy_grpc_protector *zero_copy_protector = NULL; + tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector( + h->handshaker_result, NULL, &zero_copy_protector); + if (result != TSI_OK && result != TSI_UNIMPLEMENTED) { error = grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Zero-copy frame protector creation failed"), result); security_handshake_failed_locked(exec_ctx, h, error); goto done; } + // Create frame protector if zero-copy frame protector is NULL. + tsi_frame_protector *protector = NULL; + if (zero_copy_protector == NULL) { + result = tsi_handshaker_result_create_frame_protector(h->handshaker_result, + NULL, &protector); + if (result != TSI_OK) { + error = grpc_set_tsi_error_result(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Frame protector creation failed"), + result); + security_handshake_failed_locked(exec_ctx, h, error); + goto done; + } + } // Get unused bytes. const unsigned char *unused_bytes = NULL; size_t unused_bytes_size = 0; @@ -155,12 +170,12 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, if (unused_bytes_size > 0) { grpc_slice slice = grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size); - h->args->endpoint = - grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1); + h->args->endpoint = grpc_secure_endpoint_create( + protector, zero_copy_protector, h->args->endpoint, &slice, 1); grpc_slice_unref_internal(exec_ctx, slice); } else { - h->args->endpoint = - grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0); + h->args->endpoint = grpc_secure_endpoint_create( + protector, zero_copy_protector, h->args->endpoint, NULL, 0); } tsi_handshaker_result_destroy(h->handshaker_result); h->handshaker_result = NULL; diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c index ec93303024..523e43445b 100644 --- a/src/core/lib/support/string.c +++ b/src/core/lib/support/string.c @@ -300,11 +300,12 @@ void *gpr_memrchr(const void *s, int c, size_t n) { } bool gpr_is_true(const char *s) { + size_t i; if (s == NULL) { return false; } static const char *truthy[] = {"yes", "true", "1"}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + for (i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { if (0 == gpr_stricmp(s, truthy[i])) { return true; } diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 4a81d55e6e..d59456a439 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -44,7 +44,9 @@ static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); } static void alarm_unref(grpc_alarm *alarm) { if (gpr_unref(&alarm->refs)) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + if (alarm->cq != NULL) { + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + } grpc_exec_ctx_finish(&exec_ctx); gpr_free(alarm); } @@ -93,12 +95,8 @@ static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { (void *)alarm, &alarm->completion); } -grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, - void *tag) { +grpc_alarm *grpc_alarm_create(void *reserved) { grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm)); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - gpr_ref_init(&alarm->refs, 1); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { @@ -106,26 +104,35 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, } #endif + gpr_ref_init(&alarm->refs, 1); + grpc_timer_init_unset(&alarm->alarm); + alarm->cq = NULL; + GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, + grpc_schedule_on_exec_ctx); + return alarm; +} + +void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq, + gpr_timespec deadline, void *tag, void *reserved) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_REF(cq, "alarm"); alarm->cq = cq; alarm->tag = tag; GPR_ASSERT(grpc_cq_begin_op(cq, tag)); - GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, - grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, grpc_timespec_to_millis_round_up(deadline), &alarm->on_alarm); grpc_exec_ctx_finish(&exec_ctx); - return alarm; } -void grpc_alarm_cancel(grpc_alarm *alarm) { +void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_timer_cancel(&exec_ctx, &alarm->alarm); grpc_exec_ctx_finish(&exec_ctx); } -void grpc_alarm_destroy(grpc_alarm *alarm) { - grpc_alarm_cancel(alarm); +void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved) { + grpc_alarm_cancel(alarm, reserved); GRPC_ALARM_UNREF(alarm, "alarm_destroy"); } diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 96c16105e7..fd6ea4daa9 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -21,6 +21,6 @@ #include <grpc/grpc.h> -const char *grpc_version_string(void) { return "4.0.0-dev"; } +const char *grpc_version_string(void) { return "5.0.0-dev"; } const char *grpc_g_stands_for(void) { return "gambit"; } diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.c b/src/core/plugin_registry/grpc_cronet_plugin_registry.c index 322ebea111..1c09f54ad9 100644 --- a/src/core/plugin_registry/grpc_cronet_plugin_registry.c +++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.c @@ -28,8 +28,8 @@ extern void grpc_client_channel_init(void); extern void grpc_client_channel_shutdown(void); extern void grpc_tsi_gts_init(void); extern void grpc_tsi_gts_shutdown(void); -extern void grpc_load_reporting_plugin_init(void); -extern void grpc_load_reporting_plugin_shutdown(void); +extern void grpc_server_load_reporting_plugin_init(void); +extern void grpc_server_load_reporting_plugin_shutdown(void); void grpc_register_built_in_plugins(void) { grpc_register_plugin(grpc_http_filters_init, @@ -42,6 +42,6 @@ void grpc_register_built_in_plugins(void) { grpc_client_channel_shutdown); grpc_register_plugin(grpc_tsi_gts_init, grpc_tsi_gts_shutdown); - grpc_register_plugin(grpc_load_reporting_plugin_init, - grpc_load_reporting_plugin_shutdown); + grpc_register_plugin(grpc_server_load_reporting_plugin_init, + grpc_server_load_reporting_plugin_shutdown); } diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c index fa9974952c..9cacf3d306 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.c +++ b/src/core/plugin_registry/grpc_plugin_registry.c @@ -44,8 +44,8 @@ extern void grpc_resolver_dns_native_init(void); extern void grpc_resolver_dns_native_shutdown(void); extern void grpc_resolver_sockaddr_init(void); extern void grpc_resolver_sockaddr_shutdown(void); -extern void grpc_load_reporting_plugin_init(void); -extern void grpc_load_reporting_plugin_shutdown(void); +extern void grpc_server_load_reporting_plugin_init(void); +extern void grpc_server_load_reporting_plugin_shutdown(void); extern void census_grpc_plugin_init(void); extern void census_grpc_plugin_shutdown(void); extern void grpc_max_age_filter_init(void); @@ -82,8 +82,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_dns_native_shutdown); grpc_register_plugin(grpc_resolver_sockaddr_init, grpc_resolver_sockaddr_shutdown); - grpc_register_plugin(grpc_load_reporting_plugin_init, - grpc_load_reporting_plugin_shutdown); + grpc_register_plugin(grpc_server_load_reporting_plugin_init, + grpc_server_load_reporting_plugin_shutdown); grpc_register_plugin(census_grpc_plugin_init, census_grpc_plugin_shutdown); grpc_register_plugin(grpc_max_age_filter_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c index 7eb599d81a..7b90d796d5 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c @@ -36,8 +36,8 @@ extern void grpc_resolver_sockaddr_init(void); extern void grpc_resolver_sockaddr_shutdown(void); extern void grpc_resolver_fake_init(void); extern void grpc_resolver_fake_shutdown(void); -extern void grpc_load_reporting_plugin_init(void); -extern void grpc_load_reporting_plugin_shutdown(void); +extern void grpc_server_load_reporting_plugin_init(void); +extern void grpc_server_load_reporting_plugin_shutdown(void); extern void grpc_lb_policy_grpclb_init(void); extern void grpc_lb_policy_grpclb_shutdown(void); extern void grpc_lb_policy_pick_first_init(void); @@ -72,8 +72,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_sockaddr_shutdown); grpc_register_plugin(grpc_resolver_fake_init, grpc_resolver_fake_shutdown); - grpc_register_plugin(grpc_load_reporting_plugin_init, - grpc_load_reporting_plugin_shutdown); + grpc_register_plugin(grpc_server_load_reporting_plugin_init, + grpc_server_load_reporting_plugin_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index 967126ecee..e7b3be3d86 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -25,7 +25,8 @@ #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/useful.h> -#include "src/core/tsi/transport_security.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/tsi/transport_security_grpc.h" /* --- Constants. ---*/ #define TSI_FAKE_FRAME_HEADER_SIZE 4 @@ -74,6 +75,14 @@ typedef struct { size_t max_frame_size; } tsi_fake_frame_protector; +typedef struct { + tsi_zero_copy_grpc_protector base; + grpc_slice_buffer header_sb; + grpc_slice_buffer protected_sb; + size_t max_frame_size; + size_t parsed_frame_size; +} tsi_fake_zero_copy_grpc_protector; + /* --- Utils. ---*/ static const char *tsi_fake_handshake_message_strings[] = { @@ -113,6 +122,28 @@ static void store32_little_endian(uint32_t value, unsigned char *buf) { buf[0] = (unsigned char)((value)&0xFF); } +static uint32_t read_frame_size(const grpc_slice_buffer *sb) { + GPR_ASSERT(sb != NULL && sb->length >= TSI_FAKE_FRAME_HEADER_SIZE); + uint8_t frame_size_buffer[TSI_FAKE_FRAME_HEADER_SIZE]; + uint8_t *buf = frame_size_buffer; + /* Copies the first 4 bytes to a temporary buffer. */ + size_t remaining = TSI_FAKE_FRAME_HEADER_SIZE; + for (size_t i = 0; i < sb->count; i++) { + size_t slice_length = GRPC_SLICE_LENGTH(sb->slices[i]); + if (remaining <= slice_length) { + memcpy(buf, GRPC_SLICE_START_PTR(sb->slices[i]), remaining); + remaining = 0; + break; + } else { + memcpy(buf, GRPC_SLICE_START_PTR(sb->slices[i]), slice_length); + buf += slice_length; + remaining -= slice_length; + } + } + GPR_ASSERT(remaining == 0); + return load32_little_endian(frame_size_buffer); +} + static void tsi_fake_frame_reset(tsi_fake_frame *frame, int needs_draining) { frame->offset = 0; frame->needs_draining = needs_draining; @@ -363,6 +394,84 @@ static const tsi_frame_protector_vtable frame_protector_vtable = { fake_protector_unprotect, fake_protector_destroy, }; +/* --- tsi_zero_copy_grpc_protector methods implementation. ---*/ + +static tsi_result fake_zero_copy_grpc_protector_protect( + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *unprotected_slices, + grpc_slice_buffer *protected_slices) { + if (self == NULL || unprotected_slices == NULL || protected_slices == NULL) { + return TSI_INVALID_ARGUMENT; + } + tsi_fake_zero_copy_grpc_protector *impl = + (tsi_fake_zero_copy_grpc_protector *)self; + /* Protects each frame. */ + while (unprotected_slices->length > 0) { + size_t frame_length = + GPR_MIN(impl->max_frame_size, + unprotected_slices->length + TSI_FAKE_FRAME_HEADER_SIZE); + grpc_slice slice = GRPC_SLICE_MALLOC(TSI_FAKE_FRAME_HEADER_SIZE); + store32_little_endian((uint32_t)frame_length, GRPC_SLICE_START_PTR(slice)); + grpc_slice_buffer_add(protected_slices, slice); + size_t data_length = frame_length - TSI_FAKE_FRAME_HEADER_SIZE; + grpc_slice_buffer_move_first(unprotected_slices, data_length, + protected_slices); + } + return TSI_OK; +} + +static tsi_result fake_zero_copy_grpc_protector_unprotect( + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *protected_slices, + grpc_slice_buffer *unprotected_slices) { + if (self == NULL || unprotected_slices == NULL || protected_slices == NULL) { + return TSI_INVALID_ARGUMENT; + } + tsi_fake_zero_copy_grpc_protector *impl = + (tsi_fake_zero_copy_grpc_protector *)self; + grpc_slice_buffer_move_into(protected_slices, &impl->protected_sb); + /* Unprotect each frame, if we get a full frame. */ + while (impl->protected_sb.length >= TSI_FAKE_FRAME_HEADER_SIZE) { + if (impl->parsed_frame_size == 0) { + impl->parsed_frame_size = read_frame_size(&impl->protected_sb); + if (impl->parsed_frame_size <= 4) { + gpr_log(GPR_ERROR, "Invalid frame size."); + return TSI_DATA_CORRUPTED; + } + } + /* If we do not have a full frame, return with OK status. */ + if (impl->protected_sb.length < impl->parsed_frame_size) break; + /* Strips header bytes. */ + grpc_slice_buffer_move_first(&impl->protected_sb, + TSI_FAKE_FRAME_HEADER_SIZE, &impl->header_sb); + /* Moves data to unprotected slices. */ + grpc_slice_buffer_move_first( + &impl->protected_sb, + impl->parsed_frame_size - TSI_FAKE_FRAME_HEADER_SIZE, + unprotected_slices); + impl->parsed_frame_size = 0; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &impl->header_sb); + } + return TSI_OK; +} + +static void fake_zero_copy_grpc_protector_destroy( + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self) { + if (self == NULL) return; + tsi_fake_zero_copy_grpc_protector *impl = + (tsi_fake_zero_copy_grpc_protector *)self; + grpc_slice_buffer_destroy_internal(exec_ctx, &impl->header_sb); + grpc_slice_buffer_destroy_internal(exec_ctx, &impl->protected_sb); + gpr_free(impl); +} + +static const tsi_zero_copy_grpc_protector_vtable + zero_copy_grpc_protector_vtable = { + fake_zero_copy_grpc_protector_protect, + fake_zero_copy_grpc_protector_unprotect, + fake_zero_copy_grpc_protector_destroy, +}; + /* --- tsi_handshaker_result methods implementation. ---*/ typedef struct { @@ -383,6 +492,14 @@ static tsi_result fake_handshaker_result_extract_peer( return result; } +static tsi_result fake_handshaker_result_create_zero_copy_grpc_protector( + const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + tsi_zero_copy_grpc_protector **protector) { + *protector = + tsi_create_fake_zero_copy_grpc_protector(max_output_protected_frame_size); + return TSI_OK; +} + static tsi_result fake_handshaker_result_create_frame_protector( const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, tsi_frame_protector **protector) { @@ -407,7 +524,7 @@ static void fake_handshaker_result_destroy(tsi_handshaker_result *self) { static const tsi_handshaker_result_vtable handshaker_result_vtable = { fake_handshaker_result_extract_peer, - NULL, /* create_zero_copy_grpc_protector */ + fake_handshaker_result_create_zero_copy_grpc_protector, fake_handshaker_result_create_frame_protector, fake_handshaker_result_get_unused_bytes, fake_handshaker_result_destroy, @@ -631,3 +748,16 @@ tsi_frame_protector *tsi_create_fake_frame_protector( impl->base.vtable = &frame_protector_vtable; return &impl->base; } + +tsi_zero_copy_grpc_protector *tsi_create_fake_zero_copy_grpc_protector( + size_t *max_protected_frame_size) { + tsi_fake_zero_copy_grpc_protector *impl = gpr_zalloc(sizeof(*impl)); + grpc_slice_buffer_init(&impl->header_sb); + grpc_slice_buffer_init(&impl->protected_sb); + impl->max_frame_size = (max_protected_frame_size == NULL) + ? TSI_FAKE_DEFAULT_FRAME_SIZE + : *max_protected_frame_size; + impl->parsed_frame_size = 0; + impl->base.vtable = &zero_copy_grpc_protector_vtable; + return &impl->base; +} diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h index 934b3cbeb2..6159708a84 100644 --- a/src/core/tsi/fake_transport_security.h +++ b/src/core/tsi/fake_transport_security.h @@ -39,6 +39,11 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client); tsi_frame_protector *tsi_create_fake_frame_protector( size_t *max_protected_frame_size); +/* Creates a zero-copy protector directly without going through the handshake + * phase. */ +tsi_zero_copy_grpc_protector *tsi_create_fake_zero_copy_grpc_protector( + size_t *max_protected_frame_size); + #ifdef __cplusplus } #endif diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c index 5bcfdfa61f..773b35e717 100644 --- a/src/core/tsi/transport_security_grpc.c +++ b/src/core/tsi/transport_security_grpc.c @@ -37,28 +37,33 @@ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( Calls specific implementation after state/input validation. */ tsi_result tsi_zero_copy_grpc_protector_protect( - tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices) { - if (self == NULL || self->vtable == NULL || unprotected_slices == NULL || - protected_slices == NULL) { + if (exec_ctx == NULL || self == NULL || self->vtable == NULL || + unprotected_slices == NULL || protected_slices == NULL) { return TSI_INVALID_ARGUMENT; } if (self->vtable->protect == NULL) return TSI_UNIMPLEMENTED; - return self->vtable->protect(self, unprotected_slices, protected_slices); + return self->vtable->protect(exec_ctx, self, unprotected_slices, + protected_slices); } tsi_result tsi_zero_copy_grpc_protector_unprotect( - tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices) { - if (self == NULL || self->vtable == NULL || protected_slices == NULL || - unprotected_slices == NULL) { + if (exec_ctx == NULL || self == NULL || self->vtable == NULL || + protected_slices == NULL || unprotected_slices == NULL) { return TSI_INVALID_ARGUMENT; } if (self->vtable->unprotect == NULL) return TSI_UNIMPLEMENTED; - return self->vtable->unprotect(self, protected_slices, unprotected_slices); + return self->vtable->unprotect(exec_ctx, self, protected_slices, + unprotected_slices); } -void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self) { +void tsi_zero_copy_grpc_protector_destroy(grpc_exec_ctx *exec_ctx, + tsi_zero_copy_grpc_protector *self) { if (self == NULL) return; - self->vtable->destroy(self); + self->vtable->destroy(exec_ctx, self); } diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h index 5ab5297cc4..375a758888 100644 --- a/src/core/tsi/transport_security_grpc.h +++ b/src/core/tsi/transport_security_grpc.h @@ -42,8 +42,8 @@ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( - This method returns TSI_OK in case of success or a specific error code in case of failure. */ tsi_result tsi_zero_copy_grpc_protector_protect( - tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, - grpc_slice_buffer *protected_slices); + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices); /* Outputs unprotected bytes. - protected_slices is the bytes of protected frames. @@ -52,21 +52,24 @@ tsi_result tsi_zero_copy_grpc_protector_protect( there is not enough data to output in which case unprotected_slices has 0 bytes. */ tsi_result tsi_zero_copy_grpc_protector_unprotect( - tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, - grpc_slice_buffer *unprotected_slices); + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices); /* Destroys the tsi_zero_copy_grpc_protector object. */ -void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self); +void tsi_zero_copy_grpc_protector_destroy(grpc_exec_ctx *exec_ctx, + tsi_zero_copy_grpc_protector *self); /* Base for tsi_zero_copy_grpc_protector implementations. */ typedef struct { - tsi_result (*protect)(tsi_zero_copy_grpc_protector *self, + tsi_result (*protect)(grpc_exec_ctx *exec_ctx, + tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices); - tsi_result (*unprotect)(tsi_zero_copy_grpc_protector *self, + tsi_result (*unprotect)(grpc_exec_ctx *exec_ctx, + tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices); - void (*destroy)(tsi_zero_copy_grpc_protector *self); + void (*destroy)(grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self); } tsi_zero_copy_grpc_protector_vtable; struct tsi_zero_copy_grpc_protector { diff --git a/src/cpp/util/core_stats.cc b/src/cpp/util/core_stats.cc new file mode 100644 index 0000000000..edf0b1bb67 --- /dev/null +++ b/src/cpp/util/core_stats.cc @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/cpp/util/core_stats.h" + +#include <grpc/support/log.h> + +using grpc::core::Bucket; +using grpc::core::Histogram; +using grpc::core::Metric; +using grpc::core::Stats; + +namespace grpc { + +void CoreStatsToProto(const grpc_stats_data& core, Stats* proto) { + for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + Metric* m = proto->add_metrics(); + m->set_name(grpc_stats_counter_name[i]); + m->set_count(core.counters[i]); + } + for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + Metric* m = proto->add_metrics(); + m->set_name(grpc_stats_histogram_name[i]); + Histogram* h = m->mutable_histogram(); + for (int j = 0; j < grpc_stats_histo_buckets[i]; j++) { + Bucket* b = h->add_buckets(); + b->set_start(grpc_stats_histo_bucket_boundaries[i][j]); + b->set_count(core.histograms[grpc_stats_histo_start[i] + j]); + } + } +} + +void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core) { + memset(core, 0, sizeof(*core)); + for (const auto& m : proto.metrics()) { + switch (m.value_case()) { + case Metric::VALUE_NOT_SET: + break; + case Metric::kCount: + for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + if (m.name() == grpc_stats_counter_name[i]) { + core->counters[i] = m.count(); + break; + } + } + break; + case Metric::kHistogram: + for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + if (m.name() == grpc_stats_histogram_name[i]) { + const auto& h = m.histogram(); + bool valid = true; + if (grpc_stats_histo_buckets[i] != h.buckets_size()) valid = false; + for (int j = 0; valid && j < h.buckets_size(); j++) { + if (grpc_stats_histo_bucket_boundaries[i][j] != + h.buckets(j).start()) { + valid = false; + } + } + if (!valid) { + gpr_log(GPR_ERROR, + "Found histogram %s but shape is different from proto", + m.name().c_str()); + } + for (int j = 0; valid && j < h.buckets_size(); j++) { + core->histograms[grpc_stats_histo_start[i] + j] = + h.buckets(j).count(); + } + } + } + break; + } + } +} + +} // namespace grpc diff --git a/src/cpp/util/core_stats.h b/src/cpp/util/core_stats.h new file mode 100644 index 0000000000..00e38bf266 --- /dev/null +++ b/src/cpp/util/core_stats.h @@ -0,0 +1,35 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H +#define GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H + +#include "src/proto/grpc/core/stats.pb.h" + +extern "C" { +#include "src/core/lib/debug/stats.h" +} + +namespace grpc { + +void CoreStatsToProto(const grpc_stats_data& core, grpc::core::Stats* proto); +void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core); + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H diff --git a/src/proto/grpc/core/BUILD b/src/proto/grpc/core/BUILD new file mode 100644 index 0000000000..46de9fae18 --- /dev/null +++ b/src/proto/grpc/core/BUILD @@ -0,0 +1,24 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +licenses(["notice"]) # Apache v2 + +load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package") + +grpc_package(name = "core", visibility = "public") + +grpc_proto_library( + name = "stats_proto", + srcs = ["stats.proto"], +) diff --git a/src/proto/grpc/core/stats.proto b/src/proto/grpc/core/stats.proto new file mode 100644 index 0000000000..ac181b0439 --- /dev/null +++ b/src/proto/grpc/core/stats.proto @@ -0,0 +1,38 @@ +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.core; + +message Bucket { + double start = 1; + uint64 count = 2; +} + +message Histogram { + repeated Bucket buckets = 1; +} + +message Metric { + string name = 1; + oneof value { + uint64 count = 10; + Histogram histogram = 11; + } +} + +message Stats { + repeated Metric metrics = 1; +} diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD index 07e08117f0..36d3782262 100644 --- a/src/proto/grpc/testing/BUILD +++ b/src/proto/grpc/testing/BUILD @@ -84,6 +84,9 @@ grpc_proto_library( name = "stats_proto", srcs = ["stats.proto"], has_services = False, + deps = [ + "//src/proto/grpc/core:stats_proto", + ] ) grpc_proto_library( diff --git a/src/proto/grpc/testing/stats.proto b/src/proto/grpc/testing/stats.proto index c738c4f895..a0f84ddbce 100644 --- a/src/proto/grpc/testing/stats.proto +++ b/src/proto/grpc/testing/stats.proto @@ -16,6 +16,8 @@ syntax = "proto3"; package grpc.testing; +import "src/proto/grpc/core/stats.proto"; + message ServerStats { // wall clock time change in seconds since last reset double time_elapsed = 1; @@ -35,6 +37,9 @@ message ServerStats { // Number of polls called inside completion queue uint64 cq_poll_count = 6; + + // Core library stats + grpc.core.Stats core_stats = 7; } // Histogram params based on grpc/support/histogram.c @@ -72,4 +77,7 @@ message ClientStats { // Number of polls called inside completion queue uint64 cq_poll_count = 6; + + // Core library stats + grpc.core.Stats core_stats = 7; } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index d0b817b602..859584a413 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -298,8 +298,8 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c', - 'src/core/ext/filters/load_reporting/load_reporting.c', - 'src/core/ext/filters/load_reporting/load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c', 'src/core/ext/census/base_resources.c', 'src/core/ext/census/context.c', 'src/core/ext/census/gen/census.pb.c', diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 0402ce34fb..57b543967e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -88,6 +88,7 @@ grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import; grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import; grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; grpc_alarm_create_type grpc_alarm_create_import; +grpc_alarm_set_type grpc_alarm_set_import; grpc_alarm_cancel_type grpc_alarm_cancel_import; grpc_alarm_destroy_type grpc_alarm_destroy_import; grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; @@ -395,6 +396,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown"); grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy"); grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create"); + grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set"); grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel"); grpc_alarm_destroy_import = (grpc_alarm_destroy_type) GetProcAddress(library, "grpc_alarm_destroy"); grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index e3704e592b..c5c848ae44 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -242,13 +242,16 @@ extern grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq); extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; #define grpc_completion_queue_destroy grpc_completion_queue_destroy_import -typedef grpc_alarm *(*grpc_alarm_create_type)(grpc_completion_queue *cq, gpr_timespec deadline, void *tag); +typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved); extern grpc_alarm_create_type grpc_alarm_create_import; #define grpc_alarm_create grpc_alarm_create_import -typedef void(*grpc_alarm_cancel_type)(grpc_alarm *alarm); +typedef void(*grpc_alarm_set_type)(grpc_alarm *alarm, grpc_completion_queue *cq, gpr_timespec deadline, void *tag, void *reserved); +extern grpc_alarm_set_type grpc_alarm_set_import; +#define grpc_alarm_set grpc_alarm_set_import +typedef void(*grpc_alarm_cancel_type)(grpc_alarm *alarm, void *reserved); extern grpc_alarm_cancel_type grpc_alarm_cancel_import; #define grpc_alarm_cancel grpc_alarm_cancel_import -typedef void(*grpc_alarm_destroy_type)(grpc_alarm *alarm); +typedef void(*grpc_alarm_destroy_type)(grpc_alarm *alarm, void *reserved); extern grpc_alarm_destroy_type grpc_alarm_destroy_import; #define grpc_alarm_destroy grpc_alarm_destroy_import typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel *channel, int try_to_connect); |