aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/statistics/census_rpc_stats.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/statistics/census_rpc_stats.c')
-rw-r--r--src/core/statistics/census_rpc_stats.c206
1 files changed, 201 insertions, 5 deletions
diff --git a/src/core/statistics/census_rpc_stats.c b/src/core/statistics/census_rpc_stats.c
index 28101ac734..a1ac2abff3 100644
--- a/src/core/statistics/census_rpc_stats.c
+++ b/src/core/statistics/census_rpc_stats.c
@@ -35,7 +35,85 @@
#include "src/core/statistics/census_interface.h"
#include "src/core/statistics/census_rpc_stats.h"
+#include "src/core/statistics/hash_table.h"
+#include "src/core/statistics/census_tracing.h"
+#include "src/core/statistics/window_stats.h"
+#include "src/core/support/murmur_hash.h"
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+#include <grpc/support/sync.h>
+
+#define NUM_INTERVALS 3
+#define MINUTE_INTERVAL 0
+#define HOUR_INTERVAL 1
+#define TOTAL_INTERVAL 2
+
+/* for easier typing */
+typedef census_per_method_rpc_stats per_method_stats;
+
+/* Ensure mu is only initialized once. */
+static gpr_once g_stats_store_mu_init = GPR_ONCE_INIT;
+/* Guards two stats stores. */
+static gpr_mu g_mu;
+static census_ht* g_client_stats_store = NULL;
+static census_ht* g_server_stats_store = NULL;
+
+static void init_mutex() { gpr_mu_init(&g_mu); }
+
+static void init_mutex_once() {
+ gpr_once_init(&g_stats_store_mu_init, init_mutex);
+}
+
+static int cmp_str_keys(const void* k1, const void* k2) {
+ return strcmp((const char*)k1, (const char*)k2);
+}
+
+/* TODO(hongyu): replace it with cityhash64 */
+static gpr_uint64 simple_hash(const void* k) {
+ size_t len = strlen(k);
+ gpr_uint64 higher = gpr_murmur_hash3((const char*)k, len / 2, 0);
+ return higher << 32 |
+ gpr_murmur_hash3((const char*)k + len / 2, len - len / 2, 0);
+}
+
+static void delete_stats(void* stats) {
+ census_window_stats_destroy((struct census_window_stats*)stats);
+}
+
+static void delete_key(void* key) { gpr_free(key); }
+
+static const census_ht_option ht_opt = {
+ CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */,
+ simple_hash /* hash function */, cmp_str_keys /* key comparator */,
+ delete_stats /* data deleter */, delete_key /* key deleter */};
+
+static void init_rpc_stats(void* stats) {
+ memset(stats, 0, sizeof(census_rpc_stats));
+}
+
+static void stat_add_proportion(double p, void* base, const void* addme) {
+ census_rpc_stats* b = (census_rpc_stats*)base;
+ census_rpc_stats* a = (census_rpc_stats*)addme;
+ b->cnt += p * a->cnt;
+ b->rpc_error_cnt += p * a->rpc_error_cnt;
+ b->app_error_cnt += p * a->app_error_cnt;
+ b->elapsed_time_ms += p * a->elapsed_time_ms;
+ b->api_request_bytes += p * a->api_request_bytes;
+ b->wire_request_bytes += p * a->wire_request_bytes;
+ b->api_response_bytes += p * a->api_response_bytes;
+ b->wire_response_bytes += p * a->wire_response_bytes;
+}
+
+static void stat_add(void* base, const void* addme) {
+ stat_add_proportion(1.0, base, addme);
+}
+
+static gpr_timespec min_hour_total_intervals[3] = {
+ {60, 0}, {3600, 0}, {36000000, 0}};
+
+static const census_window_stats_stat_info window_stats_settings = {
+ sizeof(census_rpc_stats), init_rpc_stats, stat_add, stat_add_proportion};
census_rpc_stats* census_rpc_stats_create_empty() {
census_rpc_stats* ret =
@@ -44,14 +122,132 @@ census_rpc_stats* census_rpc_stats_create_empty() {
return ret;
}
-void census_aggregated_rpc_stats_destroy(census_aggregated_rpc_stats* data) {}
+void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats* data) {
+ int i = 0;
+ for (i = 0; i < data->num_entries; i++) {
+ if (data->stats[i].method != NULL) {
+ gpr_free((void*)data->stats[i].method);
+ }
+ }
+ if (data->stats != NULL) {
+ gpr_free(data->stats);
+ }
+ data->num_entries = 0;
+ data->stats = NULL;
+}
+
+static void record_stats(census_ht* store, census_op_id op_id,
+ const census_rpc_stats* stats) {
+ gpr_mu_lock(&g_mu);
+ if (store != NULL) {
+ trace_obj* trace = NULL;
+ census_internal_lock_trace_store();
+ trace = census_get_trace_obj_locked(op_id);
+ if (trace != NULL) {
+ const char* method_name = census_get_trace_method_name(trace);
+ struct census_window_stats* window_stats = NULL;
+ census_ht_key key;
+ key.ptr = (void*)method_name;
+ window_stats = census_ht_find(store, key);
+ census_internal_unlock_trace_store();
+ if (window_stats == NULL) {
+ window_stats = census_window_stats_create(3, min_hour_total_intervals,
+ 30, &window_stats_settings);
+ key.ptr = gpr_strdup(key.ptr);
+ census_ht_insert(store, key, (void*)window_stats);
+ }
+ census_window_stats_add(window_stats, gpr_now(), stats);
+ } else {
+ census_internal_unlock_trace_store();
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+}
void census_record_rpc_client_stats(census_op_id op_id,
- const census_rpc_stats* stats) {}
+ const census_rpc_stats* stats) {
+ record_stats(g_client_stats_store, op_id, stats);
+}
void census_record_rpc_server_stats(census_op_id op_id,
- const census_rpc_stats* stats) {}
+ const census_rpc_stats* stats) {
+ record_stats(g_server_stats_store, op_id, stats);
+}
-void census_get_server_stats(census_aggregated_rpc_stats* data) {}
+/* Get stats from input stats store */
+static void get_stats(census_ht* store, census_aggregated_rpc_stats* data) {
+ GPR_ASSERT(data != NULL);
+ if (data->num_entries != 0) {
+ census_aggregated_rpc_stats_set_empty(data);
+ }
+ gpr_mu_lock(&g_mu);
+ if (store != NULL) {
+ size_t n;
+ int i, j;
+ gpr_timespec now = gpr_now();
+ census_ht_kv* kv = census_ht_get_all_elements(store, &n);
+ if (kv != NULL) {
+ data->num_entries = n;
+ data->stats = (per_method_stats*)gpr_malloc(sizeof(per_method_stats) * n);
+ for (i = 0; i < n; i++) {
+ census_window_stats_sums sums[NUM_INTERVALS];
+ for (j = 0; j < NUM_INTERVALS; j++) {
+ sums[j].statistic = (void*)census_rpc_stats_create_empty();
+ }
+ data->stats[i].method = gpr_strdup(kv[i].k.ptr);
+ census_window_stats_get_sums(kv[i].v, now, sums);
+ data->stats[i].minute_stats =
+ *(census_rpc_stats*)sums[MINUTE_INTERVAL].statistic;
+ data->stats[i].hour_stats =
+ *(census_rpc_stats*)sums[HOUR_INTERVAL].statistic;
+ data->stats[i].total_stats =
+ *(census_rpc_stats*)sums[TOTAL_INTERVAL].statistic;
+ for (j = 0; j < NUM_INTERVALS; j++) {
+ gpr_free(sums[j].statistic);
+ }
+ }
+ gpr_free(kv);
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void census_get_client_stats(census_aggregated_rpc_stats* data) {
+ get_stats(g_client_stats_store, data);
+}
+
+void census_get_server_stats(census_aggregated_rpc_stats* data) {
+ get_stats(g_server_stats_store, data);
+}
+
+void census_stats_store_init() {
+ gpr_log(GPR_INFO, "Initialize census stats store.");
+ init_mutex_once();
+ gpr_mu_lock(&g_mu);
+ if (g_client_stats_store == NULL && g_server_stats_store == NULL) {
+ g_client_stats_store = census_ht_create(&ht_opt);
+ g_server_stats_store = census_ht_create(&ht_opt);
+ } else {
+ gpr_log(GPR_ERROR, "Census stats store already initialized.");
+ }
+ gpr_mu_unlock(&g_mu);
+}
-void census_get_client_stats(census_aggregated_rpc_stats* data) {}
+void census_stats_store_shutdown() {
+ gpr_log(GPR_INFO, "Shutdown census stats store.");
+ init_mutex_once();
+ gpr_mu_lock(&g_mu);
+ if (g_client_stats_store != NULL) {
+ census_ht_destroy(g_client_stats_store);
+ g_client_stats_store = NULL;
+ } else {
+ gpr_log(GPR_ERROR, "Census server stats store not initialized.");
+ }
+ if (g_server_stats_store != NULL) {
+ census_ht_destroy(g_server_stats_store);
+ g_server_stats_store = NULL;
+ } else {
+ gpr_log(GPR_ERROR, "Census client stats store not initialized.");
+ }
+ gpr_mu_unlock(&g_mu);
+}