diff options
Diffstat (limited to 'src/core/statistics/census_rpc_stats.c')
-rw-r--r-- | src/core/statistics/census_rpc_stats.c | 206 |
1 files changed, 201 insertions, 5 deletions
diff --git a/src/core/statistics/census_rpc_stats.c b/src/core/statistics/census_rpc_stats.c index 28101ac734..a1ac2abff3 100644 --- a/src/core/statistics/census_rpc_stats.c +++ b/src/core/statistics/census_rpc_stats.c @@ -35,7 +35,85 @@ #include "src/core/statistics/census_interface.h" #include "src/core/statistics/census_rpc_stats.h" +#include "src/core/statistics/hash_table.h" +#include "src/core/statistics/census_tracing.h" +#include "src/core/statistics/window_stats.h" +#include "src/core/support/murmur_hash.h" #include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> + +#define NUM_INTERVALS 3 +#define MINUTE_INTERVAL 0 +#define HOUR_INTERVAL 1 +#define TOTAL_INTERVAL 2 + +/* for easier typing */ +typedef census_per_method_rpc_stats per_method_stats; + +/* Ensure mu is only initialized once. */ +static gpr_once g_stats_store_mu_init = GPR_ONCE_INIT; +/* Guards two stats stores. */ +static gpr_mu g_mu; +static census_ht* g_client_stats_store = NULL; +static census_ht* g_server_stats_store = NULL; + +static void init_mutex() { gpr_mu_init(&g_mu); } + +static void init_mutex_once() { + gpr_once_init(&g_stats_store_mu_init, init_mutex); +} + +static int cmp_str_keys(const void* k1, const void* k2) { + return strcmp((const char*)k1, (const char*)k2); +} + +/* TODO(hongyu): replace it with cityhash64 */ +static gpr_uint64 simple_hash(const void* k) { + size_t len = strlen(k); + gpr_uint64 higher = gpr_murmur_hash3((const char*)k, len / 2, 0); + return higher << 32 | + gpr_murmur_hash3((const char*)k + len / 2, len - len / 2, 0); +} + +static void delete_stats(void* stats) { + census_window_stats_destroy((struct census_window_stats*)stats); +} + +static void delete_key(void* key) { gpr_free(key); } + +static const census_ht_option ht_opt = { + CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */, + simple_hash /* hash function */, cmp_str_keys /* key comparator */, + delete_stats /* data deleter */, delete_key /* key deleter */}; + +static void init_rpc_stats(void* stats) { + memset(stats, 0, sizeof(census_rpc_stats)); +} + +static void stat_add_proportion(double p, void* base, const void* addme) { + census_rpc_stats* b = (census_rpc_stats*)base; + census_rpc_stats* a = (census_rpc_stats*)addme; + b->cnt += p * a->cnt; + b->rpc_error_cnt += p * a->rpc_error_cnt; + b->app_error_cnt += p * a->app_error_cnt; + b->elapsed_time_ms += p * a->elapsed_time_ms; + b->api_request_bytes += p * a->api_request_bytes; + b->wire_request_bytes += p * a->wire_request_bytes; + b->api_response_bytes += p * a->api_response_bytes; + b->wire_response_bytes += p * a->wire_response_bytes; +} + +static void stat_add(void* base, const void* addme) { + stat_add_proportion(1.0, base, addme); +} + +static gpr_timespec min_hour_total_intervals[3] = { + {60, 0}, {3600, 0}, {36000000, 0}}; + +static const census_window_stats_stat_info window_stats_settings = { + sizeof(census_rpc_stats), init_rpc_stats, stat_add, stat_add_proportion}; census_rpc_stats* census_rpc_stats_create_empty() { census_rpc_stats* ret = @@ -44,14 +122,132 @@ census_rpc_stats* census_rpc_stats_create_empty() { return ret; } -void census_aggregated_rpc_stats_destroy(census_aggregated_rpc_stats* data) {} +void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats* data) { + int i = 0; + for (i = 0; i < data->num_entries; i++) { + if (data->stats[i].method != NULL) { + gpr_free((void*)data->stats[i].method); + } + } + if (data->stats != NULL) { + gpr_free(data->stats); + } + data->num_entries = 0; + data->stats = NULL; +} + +static void record_stats(census_ht* store, census_op_id op_id, + const census_rpc_stats* stats) { + gpr_mu_lock(&g_mu); + if (store != NULL) { + trace_obj* trace = NULL; + census_internal_lock_trace_store(); + trace = census_get_trace_obj_locked(op_id); + if (trace != NULL) { + const char* method_name = census_get_trace_method_name(trace); + struct census_window_stats* window_stats = NULL; + census_ht_key key; + key.ptr = (void*)method_name; + window_stats = census_ht_find(store, key); + census_internal_unlock_trace_store(); + if (window_stats == NULL) { + window_stats = census_window_stats_create(3, min_hour_total_intervals, + 30, &window_stats_settings); + key.ptr = gpr_strdup(key.ptr); + census_ht_insert(store, key, (void*)window_stats); + } + census_window_stats_add(window_stats, gpr_now(), stats); + } else { + census_internal_unlock_trace_store(); + } + } + gpr_mu_unlock(&g_mu); +} void census_record_rpc_client_stats(census_op_id op_id, - const census_rpc_stats* stats) {} + const census_rpc_stats* stats) { + record_stats(g_client_stats_store, op_id, stats); +} void census_record_rpc_server_stats(census_op_id op_id, - const census_rpc_stats* stats) {} + const census_rpc_stats* stats) { + record_stats(g_server_stats_store, op_id, stats); +} -void census_get_server_stats(census_aggregated_rpc_stats* data) {} +/* Get stats from input stats store */ +static void get_stats(census_ht* store, census_aggregated_rpc_stats* data) { + GPR_ASSERT(data != NULL); + if (data->num_entries != 0) { + census_aggregated_rpc_stats_set_empty(data); + } + gpr_mu_lock(&g_mu); + if (store != NULL) { + size_t n; + int i, j; + gpr_timespec now = gpr_now(); + census_ht_kv* kv = census_ht_get_all_elements(store, &n); + if (kv != NULL) { + data->num_entries = n; + data->stats = (per_method_stats*)gpr_malloc(sizeof(per_method_stats) * n); + for (i = 0; i < n; i++) { + census_window_stats_sums sums[NUM_INTERVALS]; + for (j = 0; j < NUM_INTERVALS; j++) { + sums[j].statistic = (void*)census_rpc_stats_create_empty(); + } + data->stats[i].method = gpr_strdup(kv[i].k.ptr); + census_window_stats_get_sums(kv[i].v, now, sums); + data->stats[i].minute_stats = + *(census_rpc_stats*)sums[MINUTE_INTERVAL].statistic; + data->stats[i].hour_stats = + *(census_rpc_stats*)sums[HOUR_INTERVAL].statistic; + data->stats[i].total_stats = + *(census_rpc_stats*)sums[TOTAL_INTERVAL].statistic; + for (j = 0; j < NUM_INTERVALS; j++) { + gpr_free(sums[j].statistic); + } + } + gpr_free(kv); + } + } + gpr_mu_unlock(&g_mu); +} + +void census_get_client_stats(census_aggregated_rpc_stats* data) { + get_stats(g_client_stats_store, data); +} + +void census_get_server_stats(census_aggregated_rpc_stats* data) { + get_stats(g_server_stats_store, data); +} + +void census_stats_store_init() { + gpr_log(GPR_INFO, "Initialize census stats store."); + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_client_stats_store == NULL && g_server_stats_store == NULL) { + g_client_stats_store = census_ht_create(&ht_opt); + g_server_stats_store = census_ht_create(&ht_opt); + } else { + gpr_log(GPR_ERROR, "Census stats store already initialized."); + } + gpr_mu_unlock(&g_mu); +} -void census_get_client_stats(census_aggregated_rpc_stats* data) {} +void census_stats_store_shutdown() { + gpr_log(GPR_INFO, "Shutdown census stats store."); + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_client_stats_store != NULL) { + census_ht_destroy(g_client_stats_store); + g_client_stats_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census server stats store not initialized."); + } + if (g_server_stats_store != NULL) { + census_ht_destroy(g_server_stats_store); + g_server_stats_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census client stats store not initialized."); + } + gpr_mu_unlock(&g_mu); +} |