diff options
author | hongyu <hongyu@google.com> | 2015-01-08 15:13:49 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2015-01-09 17:52:43 -0800 |
commit | 24200d3cbca2a08c7a2b15b93f1c63efb786367d (patch) | |
tree | d77e594a01ae514b081cd8531ef357614bbaee65 /src | |
parent | 6edb547c99e42d5bf0dbea540883728a09066d4b (diff) |
C implementation of Census trace store and stats store for grpc C lib.
Change on 2015/01/08 by hongyu <hongyu@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83556470
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/census_filter.c | 18 | ||||
-rw-r--r-- | src/core/statistics/census_init.c | 17 | ||||
-rw-r--r-- | src/core/statistics/census_rpc_stats.c | 206 | ||||
-rw-r--r-- | src/core/statistics/census_rpc_stats.h | 26 | ||||
-rw-r--r-- | src/core/statistics/census_tracing.c | 149 | ||||
-rw-r--r-- | src/core/statistics/census_tracing.h | 59 | ||||
-rw-r--r-- | src/core/support/time.c | 4 |
7 files changed, 453 insertions, 26 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index d610a6fc9d..2799bded8a 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -60,13 +60,11 @@ static void init_rpc_stats(census_rpc_stats* stats) { stats->cnt = 1; } -static double gpr_timespec_to_micros(gpr_timespec t) { - return t.tv_sec * GPR_US_PER_SEC + t.tv_nsec * 1e-3; -} - static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld, channel_data* chand) { if (op->data.metadata->key == chand->path_str) { + gpr_log(GPR_DEBUG, + (const char*)GPR_SLICE_START_PTR(op->data.metadata->value->slice)); census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR( op->data.metadata->value->slice)); } @@ -78,7 +76,7 @@ static void client_call_op(grpc_call_element* elem, channel_data* chand = elem->channel_data; GPR_ASSERT(calld != NULL); GPR_ASSERT(chand != NULL); - GPR_ASSERT((calld->op_id.upper != 0) && (calld->op_id.lower != 0)); + GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); switch (op->type) { case GRPC_SEND_METADATA: extract_and_annotate_method_tag(op, calld, chand); @@ -99,7 +97,7 @@ static void server_call_op(grpc_call_element* elem, channel_data* chand = elem->channel_data; GPR_ASSERT(calld != NULL); GPR_ASSERT(chand != NULL); - GPR_ASSERT((calld->op_id.upper != 0) && (calld->op_id.lower != 0)); + GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); switch (op->type) { case GRPC_RECV_METADATA: extract_and_annotate_method_tag(op, calld, chand); @@ -171,7 +169,13 @@ static void init_channel_elem(grpc_channel_element* elem, chand->path_str = grpc_mdstr_from_string(mdctx, ":path"); } -static void destroy_channel_elem(grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_channel_element* elem) { + channel_data* chand = elem->channel_data; + GPR_ASSERT(chand != NULL); + if (chand->path_str != NULL) { + grpc_mdstr_unref(chand->path_str); + } +} const grpc_channel_filter grpc_client_census_filter = { client_call_op, channel_op, diff --git a/src/core/statistics/census_init.c b/src/core/statistics/census_init.c index 340214f8f5..bcb9ff9ad4 100644 --- a/src/core/statistics/census_init.c +++ b/src/core/statistics/census_init.c @@ -33,5 +33,18 @@ #include "src/core/statistics/census_interface.h" -void census_init() {} -void census_shutdown() {} +#include <grpc/support/log.h> +#include "src/core/statistics/census_rpc_stats.h" +#include "src/core/statistics/census_tracing.h" + +void census_init() { + gpr_log(GPR_INFO, "Initialize census library."); + census_tracing_init(); + census_stats_store_init(); +} + +void census_shutdown() { + gpr_log(GPR_INFO, "Shutdown census library."); + census_stats_store_shutdown(); + census_tracing_shutdown(); +} diff --git a/src/core/statistics/census_rpc_stats.c b/src/core/statistics/census_rpc_stats.c index 28101ac734..a1ac2abff3 100644 --- a/src/core/statistics/census_rpc_stats.c +++ b/src/core/statistics/census_rpc_stats.c @@ -35,7 +35,85 @@ #include "src/core/statistics/census_interface.h" #include "src/core/statistics/census_rpc_stats.h" +#include "src/core/statistics/hash_table.h" +#include "src/core/statistics/census_tracing.h" +#include "src/core/statistics/window_stats.h" +#include "src/core/support/murmur_hash.h" #include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> + +#define NUM_INTERVALS 3 +#define MINUTE_INTERVAL 0 +#define HOUR_INTERVAL 1 +#define TOTAL_INTERVAL 2 + +/* for easier typing */ +typedef census_per_method_rpc_stats per_method_stats; + +/* Ensure mu is only initialized once. */ +static gpr_once g_stats_store_mu_init = GPR_ONCE_INIT; +/* Guards two stats stores. */ +static gpr_mu g_mu; +static census_ht* g_client_stats_store = NULL; +static census_ht* g_server_stats_store = NULL; + +static void init_mutex() { gpr_mu_init(&g_mu); } + +static void init_mutex_once() { + gpr_once_init(&g_stats_store_mu_init, init_mutex); +} + +static int cmp_str_keys(const void* k1, const void* k2) { + return strcmp((const char*)k1, (const char*)k2); +} + +/* TODO(hongyu): replace it with cityhash64 */ +static gpr_uint64 simple_hash(const void* k) { + size_t len = strlen(k); + gpr_uint64 higher = gpr_murmur_hash3((const char*)k, len / 2, 0); + return higher << 32 | + gpr_murmur_hash3((const char*)k + len / 2, len - len / 2, 0); +} + +static void delete_stats(void* stats) { + census_window_stats_destroy((struct census_window_stats*)stats); +} + +static void delete_key(void* key) { gpr_free(key); } + +static const census_ht_option ht_opt = { + CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */, + simple_hash /* hash function */, cmp_str_keys /* key comparator */, + delete_stats /* data deleter */, delete_key /* key deleter */}; + +static void init_rpc_stats(void* stats) { + memset(stats, 0, sizeof(census_rpc_stats)); +} + +static void stat_add_proportion(double p, void* base, const void* addme) { + census_rpc_stats* b = (census_rpc_stats*)base; + census_rpc_stats* a = (census_rpc_stats*)addme; + b->cnt += p * a->cnt; + b->rpc_error_cnt += p * a->rpc_error_cnt; + b->app_error_cnt += p * a->app_error_cnt; + b->elapsed_time_ms += p * a->elapsed_time_ms; + b->api_request_bytes += p * a->api_request_bytes; + b->wire_request_bytes += p * a->wire_request_bytes; + b->api_response_bytes += p * a->api_response_bytes; + b->wire_response_bytes += p * a->wire_response_bytes; +} + +static void stat_add(void* base, const void* addme) { + stat_add_proportion(1.0, base, addme); +} + +static gpr_timespec min_hour_total_intervals[3] = { + {60, 0}, {3600, 0}, {36000000, 0}}; + +static const census_window_stats_stat_info window_stats_settings = { + sizeof(census_rpc_stats), init_rpc_stats, stat_add, stat_add_proportion}; census_rpc_stats* census_rpc_stats_create_empty() { census_rpc_stats* ret = @@ -44,14 +122,132 @@ census_rpc_stats* census_rpc_stats_create_empty() { return ret; } -void census_aggregated_rpc_stats_destroy(census_aggregated_rpc_stats* data) {} +void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats* data) { + int i = 0; + for (i = 0; i < data->num_entries; i++) { + if (data->stats[i].method != NULL) { + gpr_free((void*)data->stats[i].method); + } + } + if (data->stats != NULL) { + gpr_free(data->stats); + } + data->num_entries = 0; + data->stats = NULL; +} + +static void record_stats(census_ht* store, census_op_id op_id, + const census_rpc_stats* stats) { + gpr_mu_lock(&g_mu); + if (store != NULL) { + trace_obj* trace = NULL; + census_internal_lock_trace_store(); + trace = census_get_trace_obj_locked(op_id); + if (trace != NULL) { + const char* method_name = census_get_trace_method_name(trace); + struct census_window_stats* window_stats = NULL; + census_ht_key key; + key.ptr = (void*)method_name; + window_stats = census_ht_find(store, key); + census_internal_unlock_trace_store(); + if (window_stats == NULL) { + window_stats = census_window_stats_create(3, min_hour_total_intervals, + 30, &window_stats_settings); + key.ptr = gpr_strdup(key.ptr); + census_ht_insert(store, key, (void*)window_stats); + } + census_window_stats_add(window_stats, gpr_now(), stats); + } else { + census_internal_unlock_trace_store(); + } + } + gpr_mu_unlock(&g_mu); +} void census_record_rpc_client_stats(census_op_id op_id, - const census_rpc_stats* stats) {} + const census_rpc_stats* stats) { + record_stats(g_client_stats_store, op_id, stats); +} void census_record_rpc_server_stats(census_op_id op_id, - const census_rpc_stats* stats) {} + const census_rpc_stats* stats) { + record_stats(g_server_stats_store, op_id, stats); +} -void census_get_server_stats(census_aggregated_rpc_stats* data) {} +/* Get stats from input stats store */ +static void get_stats(census_ht* store, census_aggregated_rpc_stats* data) { + GPR_ASSERT(data != NULL); + if (data->num_entries != 0) { + census_aggregated_rpc_stats_set_empty(data); + } + gpr_mu_lock(&g_mu); + if (store != NULL) { + size_t n; + int i, j; + gpr_timespec now = gpr_now(); + census_ht_kv* kv = census_ht_get_all_elements(store, &n); + if (kv != NULL) { + data->num_entries = n; + data->stats = (per_method_stats*)gpr_malloc(sizeof(per_method_stats) * n); + for (i = 0; i < n; i++) { + census_window_stats_sums sums[NUM_INTERVALS]; + for (j = 0; j < NUM_INTERVALS; j++) { + sums[j].statistic = (void*)census_rpc_stats_create_empty(); + } + data->stats[i].method = gpr_strdup(kv[i].k.ptr); + census_window_stats_get_sums(kv[i].v, now, sums); + data->stats[i].minute_stats = + *(census_rpc_stats*)sums[MINUTE_INTERVAL].statistic; + data->stats[i].hour_stats = + *(census_rpc_stats*)sums[HOUR_INTERVAL].statistic; + data->stats[i].total_stats = + *(census_rpc_stats*)sums[TOTAL_INTERVAL].statistic; + for (j = 0; j < NUM_INTERVALS; j++) { + gpr_free(sums[j].statistic); + } + } + gpr_free(kv); + } + } + gpr_mu_unlock(&g_mu); +} + +void census_get_client_stats(census_aggregated_rpc_stats* data) { + get_stats(g_client_stats_store, data); +} + +void census_get_server_stats(census_aggregated_rpc_stats* data) { + get_stats(g_server_stats_store, data); +} + +void census_stats_store_init() { + gpr_log(GPR_INFO, "Initialize census stats store."); + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_client_stats_store == NULL && g_server_stats_store == NULL) { + g_client_stats_store = census_ht_create(&ht_opt); + g_server_stats_store = census_ht_create(&ht_opt); + } else { + gpr_log(GPR_ERROR, "Census stats store already initialized."); + } + gpr_mu_unlock(&g_mu); +} -void census_get_client_stats(census_aggregated_rpc_stats* data) {} +void census_stats_store_shutdown() { + gpr_log(GPR_INFO, "Shutdown census stats store."); + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_client_stats_store != NULL) { + census_ht_destroy(g_client_stats_store); + g_client_stats_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census server stats store not initialized."); + } + if (g_server_stats_store != NULL) { + census_ht_destroy(g_server_stats_store); + g_server_stats_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census client stats store not initialized."); + } + gpr_mu_unlock(&g_mu); +} diff --git a/src/core/statistics/census_rpc_stats.h b/src/core/statistics/census_rpc_stats.h index 6ab7614805..e1ff3ac31b 100644 --- a/src/core/statistics/census_rpc_stats.h +++ b/src/core/statistics/census_rpc_stats.h @@ -37,6 +37,10 @@ #include "src/core/statistics/census_interface.h" #include <grpc/support/port_platform.h> +#ifdef __cplusplus +extern "C" { +#endif + struct census_rpc_stats { gpr_uint64 cnt; gpr_uint64 rpc_error_cnt; @@ -51,19 +55,20 @@ struct census_rpc_stats { /* Creates an empty rpc stats object on heap. */ census_rpc_stats* census_rpc_stats_create_empty(); -typedef struct census_per_service_per_method_rpc_stats { - const char* service; +typedef struct census_per_method_rpc_stats { const char* method; - census_rpc_stats data; -} census_per_service_per_method_rpc_stats; + census_rpc_stats minute_stats; /* cumulative stats in the past minute */ + census_rpc_stats hour_stats; /* cumulative stats in the past hour */ + census_rpc_stats total_stats; /* cumulative stats from last gc */ +} census_per_method_rpc_stats; typedef struct census_aggregated_rpc_stats { int num_entries; - census_per_service_per_method_rpc_stats* stats; + census_per_method_rpc_stats* stats; } census_aggregated_rpc_stats; -/* Deletes aggregated data. */ -void census_aggregated_rpc_stats_destroy(census_aggregated_rpc_stats* data); +/* Initializes an aggregated rpc stats object to an empty state. */ +void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats* data); /* Records client side stats of a rpc. */ void census_record_rpc_client_stats(census_op_id op_id, @@ -86,4 +91,11 @@ void census_get_server_stats(census_aggregated_rpc_stats* data_map); DO NOT CALL from outside of grpc code. */ void census_get_client_stats(census_aggregated_rpc_stats* data_map); +void census_stats_store_init(); +void census_stats_store_shutdown(); + +#ifdef __cplusplus +} +#endif + #endif /* __GRPC_INTERNAL_STATISTICS_CENSUS_RPC_STATS_H__ */ diff --git a/src/core/statistics/census_tracing.c b/src/core/statistics/census_tracing.c index d0c9032837..d37c427c5b 100644 --- a/src/core/statistics/census_tracing.c +++ b/src/core/statistics/census_tracing.c @@ -33,15 +33,154 @@ #include "src/core/statistics/census_interface.h" +#include <stdio.h> +#include <string.h> + +#include "src/core/statistics/census_rpc_stats.h" +#include "src/core/statistics/hash_table.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +/* Struct for a trace annotation. */ +typedef struct annotation { + gpr_uint64 ts; /* timestamp of the annotation */ + char txt[CENSUS_MAX_ANNOTATION_LENGTH]; /* actual txt annotation */ + struct annotation* next; +} annotation; + +typedef struct trace_obj { + census_op_id id; + gpr_timespec ts; + census_rpc_stats rpc_stats; + char* method; + annotation* annotations; +} trace_obj; + +static void trace_obj_destroy(trace_obj* obj) { + annotation* p = obj->annotations; + while (p != NULL) { + annotation* next = p->next; + gpr_free(p); + p = next; + } + gpr_free(obj->method); + gpr_free(obj); +} + +static void delete_trace_obj(void* obj) { trace_obj_destroy((trace_obj*)obj); } + +static const census_ht_option ht_opt = { + CENSUS_HT_UINT64 /* key type*/, 571 /* n_of_buckets */, NULL /* hash */, + NULL /* compare_keys */, delete_trace_obj /* delete data */, + NULL /* delete key */}; + +static gpr_once g_init_mutex_once = GPR_ONCE_INIT; +static gpr_mu g_mu; /* Guards following two static variables. */ +static census_ht* g_trace_store = NULL; +static gpr_uint64 g_id = 0; + +static census_ht_key op_id_as_key(census_op_id* id) { + return *(census_ht_key*)id; +} + +static gpr_uint64 op_id_2_uint64(census_op_id* id) { + gpr_uint64 ret; + memcpy(&ret, id, sizeof(census_op_id)); + return ret; +} + +static void init_mutex() { gpr_mu_init(&g_mu); } + +static void init_mutex_once() { gpr_once_init(&g_init_mutex_once, init_mutex); } + census_op_id census_tracing_start_op() { - census_op_id empty_op_id = {0, 0}; - return empty_op_id; + gpr_mu_lock(&g_mu); + { + trace_obj* ret = (trace_obj*)gpr_malloc(sizeof(trace_obj)); + memset(ret, 0, sizeof(trace_obj)); + g_id++; + memcpy(&ret->id, &g_id, sizeof(census_op_id)); + ret->rpc_stats.cnt = 1; + ret->ts = gpr_now(); + census_ht_insert(g_trace_store, op_id_as_key(&ret->id), (void*)ret); + gpr_mu_unlock(&g_mu); + gpr_log(GPR_DEBUG, "Start tracing for id %lu\n", g_id); + return ret->id; + } } -int census_add_method_tag(census_op_id op_id, const char* method_name) { - return 0; +int census_add_method_tag(census_op_id op_id, const char* method) { + int ret = 0; + trace_obj* trace = NULL; + gpr_mu_lock(&g_mu); + trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); + if (trace == NULL) { + ret = 1; + } else { + trace->method = gpr_strdup(method); + } + gpr_mu_unlock(&g_mu); + return ret; } void census_tracing_print(census_op_id op_id, const char* annotation) {} -void census_tracing_end_op(census_op_id op_id) {} +void census_tracing_end_op(census_op_id op_id) { + trace_obj* trace = NULL; + gpr_mu_lock(&g_mu); + trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); + if (trace != NULL) { + trace->rpc_stats.elapsed_time_ms = + gpr_timespec_to_micros(gpr_time_sub(gpr_now(), trace->ts)); + gpr_log(GPR_DEBUG, "End tracing for id %lu, method %s, latency %f us\n", + op_id_2_uint64(&op_id), trace->method, + trace->rpc_stats.elapsed_time_ms); + census_ht_erase(g_trace_store, op_id_as_key(&op_id)); + } + gpr_mu_unlock(&g_mu); +} + +void census_tracing_init() { + gpr_log(GPR_INFO, "Initialize census trace store."); + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_trace_store == NULL) { + g_id = 1; + g_trace_store = census_ht_create(&ht_opt); + } else { + gpr_log(GPR_ERROR, "Census trace store already initialized."); + } + gpr_mu_unlock(&g_mu); +} + +void census_tracing_shutdown() { + gpr_log(GPR_INFO, "Shutdown census trace store."); + gpr_mu_lock(&g_mu); + if (g_trace_store != NULL) { + census_ht_destroy(g_trace_store); + g_trace_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census trace store is not initialized."); + } + gpr_mu_unlock(&g_mu); +} + +void census_internal_lock_trace_store() { gpr_mu_lock(&g_mu); } + +void census_internal_unlock_trace_store() { gpr_mu_unlock(&g_mu); } + +trace_obj* census_get_trace_obj_locked(census_op_id op_id) { + if (g_trace_store == NULL) { + gpr_log(GPR_ERROR, "Census trace store is not initialized."); + return NULL; + } + return (trace_obj*)census_ht_find(g_trace_store, op_id_as_key(&op_id)); +} + +const char* census_get_trace_method_name(const trace_obj* trace) { + return (const char*)trace->method; +} diff --git a/src/core/statistics/census_tracing.h b/src/core/statistics/census_tracing.h new file mode 100644 index 0000000000..2285a5cd6b --- /dev/null +++ b/src/core/statistics/census_tracing.h @@ -0,0 +1,59 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_ +#define __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_ + +/* Opaque structure for trace object */ +typedef struct trace_obj trace_obj; + +/* Initializes trace store. This function is thread safe. */ +void census_tracing_init(); + +/* Shutsdown trace store. This function is thread safe. */ +void census_tracing_shutdown(); + +/* Gets trace obj corresponding to the input op_id. Returns NULL if trace store + is not initialized or trace obj is not found. Requires trace store being + locked before calling this function. */ +trace_obj* census_get_trace_obj_locked(census_op_id op_id); + +/* The following two functions acquire and release the trace store global lock. + They are for census internal use only. */ +void census_internal_lock_trace_store(); +void census_internal_unlock_trace_store(); + +/* Gets method tag name associated with the input trace object. */ +const char* census_get_trace_method_name(const trace_obj* trace); + +#endif /* __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_ */ diff --git a/src/core/support/time.c b/src/core/support/time.c index 5330092f56..0e88c65be0 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -264,3 +264,7 @@ gpr_int32 gpr_time_to_millis(gpr_timespec t) { return t.tv_sec * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS; } } + +double gpr_timespec_to_micros(gpr_timespec t) { + return t.tv_sec * GPR_US_PER_SEC + t.tv_nsec * 1e-3; +} |