diff options
Diffstat (limited to 'src/core/lib/statistics')
-rw-r--r-- | src/core/lib/statistics/census_init.c | 48 | ||||
-rw-r--r-- | src/core/lib/statistics/census_interface.h | 76 | ||||
-rw-r--r-- | src/core/lib/statistics/census_log.c | 603 | ||||
-rw-r--r-- | src/core/lib/statistics/census_log.h | 91 | ||||
-rw-r--r-- | src/core/lib/statistics/census_rpc_stats.c | 253 | ||||
-rw-r--r-- | src/core/lib/statistics/census_rpc_stats.h | 101 | ||||
-rw-r--r-- | src/core/lib/statistics/census_tracing.c | 241 | ||||
-rw-r--r-- | src/core/lib/statistics/census_tracing.h | 96 | ||||
-rw-r--r-- | src/core/lib/statistics/hash_table.c | 303 | ||||
-rw-r--r-- | src/core/lib/statistics/hash_table.h | 131 | ||||
-rw-r--r-- | src/core/lib/statistics/window_stats.c | 316 | ||||
-rw-r--r-- | src/core/lib/statistics/window_stats.h | 173 |
12 files changed, 2432 insertions, 0 deletions
diff --git a/src/core/lib/statistics/census_init.c b/src/core/lib/statistics/census_init.c new file mode 100644 index 0000000000..bbecd62764 --- /dev/null +++ b/src/core/lib/statistics/census_init.c @@ -0,0 +1,48 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/statistics/census_interface.h" + +#include <grpc/support/log.h> +#include "src/core/lib/statistics/census_rpc_stats.h" +#include "src/core/lib/statistics/census_tracing.h" + +void census_init(void) { + census_tracing_init(); + census_stats_store_init(); +} + +void census_shutdown(void) { + census_stats_store_shutdown(); + census_tracing_shutdown(); +} diff --git a/src/core/lib/statistics/census_interface.h b/src/core/lib/statistics/census_interface.h new file mode 100644 index 0000000000..b3b3439072 --- /dev/null +++ b/src/core/lib/statistics/census_interface.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_STATISTICS_CENSUS_INTERFACE_H +#define GRPC_CORE_LIB_STATISTICS_CENSUS_INTERFACE_H + +#include <grpc/support/port_platform.h> + +/* Maximum length of an individual census trace annotation. */ +#define CENSUS_MAX_ANNOTATION_LENGTH 200 + +/* Structure of a census op id. Define as structure because 64bit integer is not + available on every platform for C89. */ +typedef struct census_op_id { + uint32_t upper; + uint32_t lower; +} census_op_id; + +typedef struct census_rpc_stats census_rpc_stats; + +/* Initializes Census library. No-op if Census is already initialized. */ +void census_init(void); + +/* Shutdown Census Library. */ +void census_shutdown(void); + +/* Annotates grpc method name on a census_op_id. The method name has the format + of <full quantified rpc service name>/<rpc function name>. Returns 0 iff + op_id and method_name are all valid. op_id is valid after its creation and + before calling census_tracing_end_op(). + + TODO(hongyu): Figure out valid characters set for service name and command + name and document requirements here.*/ +int census_add_method_tag(census_op_id op_id, const char *method_name); + +/* Annotates tracing information to a specific op_id. + Up to CENSUS_MAX_ANNOTATION_LENGTH bytes are recorded. */ +void census_tracing_print(census_op_id op_id, const char *annotation); + +/* Starts tracing for an RPC. Returns a locally unique census_op_id */ +census_op_id census_tracing_start_op(void); + +/* Ends tracing. Calling this function will invalidate the input op_id. */ +void census_tracing_end_op(census_op_id op_id); + +#endif /* GRPC_CORE_LIB_STATISTICS_CENSUS_INTERFACE_H */ diff --git a/src/core/lib/statistics/census_log.c b/src/core/lib/statistics/census_log.c new file mode 100644 index 0000000000..1fb942a78a --- /dev/null +++ b/src/core/lib/statistics/census_log.c @@ -0,0 +1,603 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Available log space is divided up in blocks of + CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the + following three data structures: + - Free blocks (free_block_list) + - Blocks with unread data (dirty_block_list) + - Blocks currently attached to cores (core_local_blocks[]) + + census_log_start_write() moves a block from core_local_blocks[] to the + end of dirty_block_list when block: + - is out-of-space OR + - has an incomplete record (an incomplete record occurs when a thread calls + census_log_start_write() and is context-switched before calling + census_log_end_write() + So, blocks in dirty_block_list are ordered, from oldest to newest, by time + when block is detached from the core. + + census_log_read_next() first iterates over dirty_block_list and then + core_local_blocks[]. It moves completely read blocks from dirty_block_list + to free_block_list. Blocks in core_local_blocks[] are not freed, even when + completely read. + + If log is configured to discard old records and free_block_list is empty, + census_log_start_write() iterates over dirty_block_list to allocate a + new block. It moves the oldest available block (no pending read/write) to + core_local_blocks[]. + + core_local_block_struct is used to implement a map from core id to the block + associated with that core. This mapping is advisory. It is possible that the + block returned by this mapping is no longer associated with that core. This + mapping is updated, lazily, by census_log_start_write(). + + Locking in block struct: + + Exclusive g_log.lock must be held before calling any functions operatong on + block structs except census_log_start_write() and + census_log_end_write(). + + Writes to a block are serialized via writer_lock. + census_log_start_write() acquires this lock and + census_log_end_write() releases it. On failure to acquire the lock, + writer allocates a new block for the current core and updates + core_local_block accordingly. + + Simultaneous read and write access is allowed. Reader can safely read up to + committed bytes (bytes_committed). + + reader_lock protects the block, currently being read, from getting recycled. + start_read() acquires reader_lock and end_read() releases the lock. + + Read/write access to a block is disabled via try_disable_access(). It returns + with both writer_lock and reader_lock held. These locks are subsequently + released by enable_access() to enable access to the block. + + A note on naming: Most function/struct names are prepended by cl_ + (shorthand for census_log). Further, functions that manipulate structures + include the name of the structure, which will be passed as the first + argument. E.g. cl_block_initialize() will initialize a cl_block. +*/ +#include "src/core/lib/statistics/census_log.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/cpu.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> +#include <string.h> + +/* End of platform specific code */ + +typedef struct census_log_block_list_struct { + struct census_log_block_list_struct *next; + struct census_log_block_list_struct *prev; + struct census_log_block *block; +} cl_block_list_struct; + +typedef struct census_log_block { + /* Pointer to underlying buffer */ + char *buffer; + gpr_atm writer_lock; + gpr_atm reader_lock; + /* Keeps completely written bytes. Declared atomic because accessed + simultaneously by reader and writer. */ + gpr_atm bytes_committed; + /* Bytes already read */ + int32_t bytes_read; + /* Links for list */ + cl_block_list_struct link; +/* We want this structure to be cacheline aligned. We assume the following + sizes for the various parts on 32/64bit systems: + type 32b size 64b size + char* 4 8 + 3x gpr_atm 12 24 + int32_t 4 8 (assumes padding) + cl_block_list_struct 12 24 + TOTAL 32 64 + + Depending on the size of our cacheline and the architecture, we + selectively add char buffering to this structure. The size is checked + via assert in census_log_initialize(). */ +#if defined(GPR_ARCH_64) +#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64) +#else +#if defined(GPR_ARCH_32) +#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 32) +#else +#error "Unknown architecture" +#endif +#endif +#if CL_BLOCK_PAD_SIZE > 0 + char padding[CL_BLOCK_PAD_SIZE]; +#endif +} cl_block; + +/* A list of cl_blocks, doubly-linked through cl_block::link. */ +typedef struct census_log_block_list { + int32_t count; /* Number of items in list. */ + cl_block_list_struct ht; /* head/tail of linked list. */ +} cl_block_list; + +/* Cacheline aligned block pointers to avoid false sharing. Block pointer must + be initialized via set_block(), before calling other functions */ +typedef struct census_log_core_local_block { + gpr_atm block; +/* Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8 */ +#if defined(GPR_ARCH_64) +#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8) +#else +#if defined(GPR_ARCH_32) +#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 4) +#else +#error "Unknown architecture" +#endif +#endif +#if CL_CORE_LOCAL_BLOCK_PAD_SIZE > 0 + char padding[CL_CORE_LOCAL_BLOCK_PAD_SIZE]; +#endif +} cl_core_local_block; + +struct census_log { + int discard_old_records; + /* Number of cores (aka hardware-contexts) */ + unsigned num_cores; + /* number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log */ + int32_t num_blocks; + cl_block *blocks; /* Block metadata. */ + cl_core_local_block *core_local_blocks; /* Keeps core to block mappings. */ + gpr_mu lock; + int initialized; /* has log been initialized? */ + /* Keeps the state of the reader iterator. A value of 0 indicates that + iterator has reached the end. census_log_init_reader() resets the + value to num_core to restart iteration. */ + uint32_t read_iterator_state; + /* Points to the block being read. If non-NULL, the block is locked for + reading (block_being_read_->reader_lock is held). */ + cl_block *block_being_read; + /* A non-zero value indicates that log is full. */ + gpr_atm is_full; + char *buffer; + cl_block_list free_block_list; + cl_block_list dirty_block_list; + gpr_atm out_of_space_count; +}; + +/* Single internal log */ +static struct census_log g_log; + +/* Functions that operate on an atomic memory location used as a lock */ + +/* Returns non-zero if lock is acquired */ +static int cl_try_lock(gpr_atm *lock) { return gpr_atm_acq_cas(lock, 0, 1); } + +static void cl_unlock(gpr_atm *lock) { gpr_atm_rel_store(lock, 0); } + +/* Functions that operate on cl_core_local_block's */ + +static void cl_core_local_block_set_block(cl_core_local_block *clb, + cl_block *block) { + gpr_atm_rel_store(&clb->block, (gpr_atm)block); +} + +static cl_block *cl_core_local_block_get_block(cl_core_local_block *clb) { + return (cl_block *)gpr_atm_acq_load(&clb->block); +} + +/* Functions that operate on cl_block_list_struct's */ + +static void cl_block_list_struct_initialize(cl_block_list_struct *bls, + cl_block *block) { + bls->next = bls->prev = bls; + bls->block = block; +} + +/* Functions that operate on cl_block_list's */ + +static void cl_block_list_initialize(cl_block_list *list) { + list->count = 0; + cl_block_list_struct_initialize(&list->ht, NULL); +} + +/* Returns head of *this, or NULL if empty. */ +static cl_block *cl_block_list_head(cl_block_list *list) { + return list->ht.next->block; +} + +/* Insert element *e after *pos. */ +static void cl_block_list_insert(cl_block_list *list, cl_block_list_struct *pos, + cl_block_list_struct *e) { + list->count++; + e->next = pos->next; + e->prev = pos; + e->next->prev = e; + e->prev->next = e; +} + +/* Insert block at the head of the list */ +static void cl_block_list_insert_at_head(cl_block_list *list, cl_block *block) { + cl_block_list_insert(list, &list->ht, &block->link); +} + +/* Insert block at the tail of the list */ +static void cl_block_list_insert_at_tail(cl_block_list *list, cl_block *block) { + cl_block_list_insert(list, list->ht.prev, &block->link); +} + +/* Removes block *b. Requires *b be in the list. */ +static void cl_block_list_remove(cl_block_list *list, cl_block *b) { + list->count--; + b->link.next->prev = b->link.prev; + b->link.prev->next = b->link.next; +} + +/* Functions that operate on cl_block's */ + +static void cl_block_initialize(cl_block *block, char *buffer) { + block->buffer = buffer; + gpr_atm_rel_store(&block->writer_lock, 0); + gpr_atm_rel_store(&block->reader_lock, 0); + gpr_atm_rel_store(&block->bytes_committed, 0); + block->bytes_read = 0; + cl_block_list_struct_initialize(&block->link, block); +} + +/* Guards against exposing partially written buffer to the reader. */ +static void cl_block_set_bytes_committed(cl_block *block, + int32_t bytes_committed) { + gpr_atm_rel_store(&block->bytes_committed, bytes_committed); +} + +static int32_t cl_block_get_bytes_committed(cl_block *block) { + return gpr_atm_acq_load(&block->bytes_committed); +} + +/* Tries to disable future read/write access to this block. Succeeds if: + - no in-progress write AND + - no in-progress read AND + - 'discard_data' set to true OR no unread data + On success, clears the block state and returns with writer_lock_ and + reader_lock_ held. These locks are released by a subsequent + cl_block_access_enable() call. */ +static int cl_block_try_disable_access(cl_block *block, int discard_data) { + if (!cl_try_lock(&block->writer_lock)) { + return 0; + } + if (!cl_try_lock(&block->reader_lock)) { + cl_unlock(&block->writer_lock); + return 0; + } + if (!discard_data && + (block->bytes_read != cl_block_get_bytes_committed(block))) { + cl_unlock(&block->reader_lock); + cl_unlock(&block->writer_lock); + return 0; + } + cl_block_set_bytes_committed(block, 0); + block->bytes_read = 0; + return 1; +} + +static void cl_block_enable_access(cl_block *block) { + cl_unlock(&block->reader_lock); + cl_unlock(&block->writer_lock); +} + +/* Returns with writer_lock held. */ +static void *cl_block_start_write(cl_block *block, size_t size) { + int32_t bytes_committed; + if (!cl_try_lock(&block->writer_lock)) { + return NULL; + } + bytes_committed = cl_block_get_bytes_committed(block); + if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) { + cl_unlock(&block->writer_lock); + return NULL; + } + return block->buffer + bytes_committed; +} + +/* Releases writer_lock and increments committed bytes by 'bytes_written'. + 'bytes_written' must be <= 'size' specified in the corresponding + StartWrite() call. This function is thread-safe. */ +static void cl_block_end_write(cl_block *block, size_t bytes_written) { + cl_block_set_bytes_committed( + block, cl_block_get_bytes_committed(block) + bytes_written); + cl_unlock(&block->writer_lock); +} + +/* Returns a pointer to the first unread byte in buffer. The number of bytes + available are returned in 'bytes_available'. Acquires reader lock that is + released by a subsequent cl_block_end_read() call. Returns NULL if: + - read in progress + - no data available */ +static void *cl_block_start_read(cl_block *block, size_t *bytes_available) { + void *record; + if (!cl_try_lock(&block->reader_lock)) { + return NULL; + } + /* bytes_committed may change from under us. Use bytes_available to update + bytes_read below. */ + *bytes_available = cl_block_get_bytes_committed(block) - block->bytes_read; + if (*bytes_available == 0) { + cl_unlock(&block->reader_lock); + return NULL; + } + record = block->buffer + block->bytes_read; + block->bytes_read += *bytes_available; + return record; +} + +static void cl_block_end_read(cl_block *block) { + cl_unlock(&block->reader_lock); +} + +/* Internal functions operating on g_log */ + +/* Allocates a new free block (or recycles an available dirty block if log is + configured to discard old records). Returns NULL if out-of-space. */ +static cl_block *cl_allocate_block(void) { + cl_block *block = cl_block_list_head(&g_log.free_block_list); + if (block != NULL) { + cl_block_list_remove(&g_log.free_block_list, block); + return block; + } + if (!g_log.discard_old_records) { + /* No free block and log is configured to keep old records. */ + return NULL; + } + /* Recycle dirty block. Start from the oldest. */ + for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL; + block = block->link.next->block) { + if (cl_block_try_disable_access(block, 1 /* discard data */)) { + cl_block_list_remove(&g_log.dirty_block_list, block); + return block; + } + } + return NULL; +} + +/* Allocates a new block and updates core id => block mapping. 'old_block' + points to the block that the caller thinks is attached to + 'core_id'. 'old_block' may be NULL. Returns non-zero if: + - allocated a new block OR + - 'core_id' => 'old_block' mapping changed (another thread allocated a + block before lock was acquired). */ +static int cl_allocate_core_local_block(int32_t core_id, cl_block *old_block) { + /* Now that we have the lock, check if core-local mapping has changed. */ + cl_core_local_block *core_local_block = &g_log.core_local_blocks[core_id]; + cl_block *block = cl_core_local_block_get_block(core_local_block); + if ((block != NULL) && (block != old_block)) { + return 1; + } + if (block != NULL) { + cl_core_local_block_set_block(core_local_block, NULL); + cl_block_list_insert_at_tail(&g_log.dirty_block_list, block); + } + block = cl_allocate_block(); + if (block == NULL) { + gpr_atm_rel_store(&g_log.is_full, 1); + return 0; + } + cl_core_local_block_set_block(core_local_block, block); + cl_block_enable_access(block); + return 1; +} + +static cl_block *cl_get_block(void *record) { + uintptr_t p = (uintptr_t)((char *)record - g_log.buffer); + uintptr_t index = p >> CENSUS_LOG_2_MAX_RECORD_SIZE; + return &g_log.blocks[index]; +} + +/* Gets the next block to read and tries to free 'prev' block (if not NULL). + Returns NULL if reached the end. */ +static cl_block *cl_next_block_to_read(cl_block *prev) { + cl_block *block = NULL; + if (g_log.read_iterator_state == g_log.num_cores) { + /* We are traversing dirty list; find the next dirty block. */ + if (prev != NULL) { + /* Try to free the previous block if there is no unread data. This block + may have unread data if previously incomplete record completed between + read_next() calls. */ + block = prev->link.next->block; + if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) { + cl_block_list_remove(&g_log.dirty_block_list, prev); + cl_block_list_insert_at_head(&g_log.free_block_list, prev); + gpr_atm_rel_store(&g_log.is_full, 0); + } + } else { + block = cl_block_list_head(&g_log.dirty_block_list); + } + if (block != NULL) { + return block; + } + /* We are done with the dirty list; moving on to core-local blocks. */ + } + while (g_log.read_iterator_state > 0) { + g_log.read_iterator_state--; + block = cl_core_local_block_get_block( + &g_log.core_local_blocks[g_log.read_iterator_state]); + if (block != NULL) { + return block; + } + } + return NULL; +} + +/* External functions: primary stats_log interface */ +void census_log_initialize(size_t size_in_mb, int discard_old_records) { + int32_t ix; + /* Check cacheline alignment. */ + GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0); + GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0); + GPR_ASSERT(!g_log.initialized); + g_log.discard_old_records = discard_old_records; + g_log.num_cores = gpr_cpu_num_cores(); + /* Ensure at least as many blocks as there are cores. */ + g_log.num_blocks = GPR_MAX( + g_log.num_cores, (size_in_mb << 20) >> CENSUS_LOG_2_MAX_RECORD_SIZE); + gpr_mu_init(&g_log.lock); + g_log.read_iterator_state = 0; + g_log.block_being_read = NULL; + gpr_atm_rel_store(&g_log.is_full, 0); + g_log.core_local_blocks = (cl_core_local_block *)gpr_malloc_aligned( + g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE_LOG); + memset(g_log.core_local_blocks, 0, + g_log.num_cores * sizeof(cl_core_local_block)); + g_log.blocks = (cl_block *)gpr_malloc_aligned( + g_log.num_blocks * sizeof(cl_block), GPR_CACHELINE_SIZE_LOG); + memset(g_log.blocks, 0, g_log.num_blocks * sizeof(cl_block)); + g_log.buffer = gpr_malloc(g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE); + memset(g_log.buffer, 0, g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE); + cl_block_list_initialize(&g_log.free_block_list); + cl_block_list_initialize(&g_log.dirty_block_list); + for (ix = 0; ix < g_log.num_blocks; ++ix) { + cl_block *block = g_log.blocks + ix; + cl_block_initialize(block, + g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * ix)); + cl_block_try_disable_access(block, 1 /* discard data */); + cl_block_list_insert_at_tail(&g_log.free_block_list, block); + } + gpr_atm_rel_store(&g_log.out_of_space_count, 0); + g_log.initialized = 1; +} + +void census_log_shutdown(void) { + GPR_ASSERT(g_log.initialized); + gpr_mu_destroy(&g_log.lock); + gpr_free_aligned(g_log.core_local_blocks); + g_log.core_local_blocks = NULL; + gpr_free_aligned(g_log.blocks); + g_log.blocks = NULL; + gpr_free(g_log.buffer); + g_log.buffer = NULL; + g_log.initialized = 0; +} + +void *census_log_start_write(size_t size) { + /* Used to bound number of times block allocation is attempted. */ + int32_t attempts_remaining = g_log.num_blocks; + /* TODO(aveitch): move this inside the do loop when current_cpu is fixed */ + int32_t core_id = gpr_cpu_current_cpu(); + GPR_ASSERT(g_log.initialized); + if (size > CENSUS_LOG_MAX_RECORD_SIZE) { + return NULL; + } + do { + int allocated; + void *record = NULL; + cl_block *block = + cl_core_local_block_get_block(&g_log.core_local_blocks[core_id]); + if (block && (record = cl_block_start_write(block, size))) { + return record; + } + /* Need to allocate a new block. We are here if: + - No block associated with the core OR + - Write in-progress on the block OR + - block is out of space */ + if (gpr_atm_acq_load(&g_log.is_full)) { + gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); + return NULL; + } + gpr_mu_lock(&g_log.lock); + allocated = cl_allocate_core_local_block(core_id, block); + gpr_mu_unlock(&g_log.lock); + if (!allocated) { + gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); + return NULL; + } + } while (attempts_remaining--); + /* Give up. */ + gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); + return NULL; +} + +void census_log_end_write(void *record, size_t bytes_written) { + GPR_ASSERT(g_log.initialized); + cl_block_end_write(cl_get_block(record), bytes_written); +} + +void census_log_init_reader(void) { + GPR_ASSERT(g_log.initialized); + gpr_mu_lock(&g_log.lock); + /* If a block is locked for reading unlock it. */ + if (g_log.block_being_read != NULL) { + cl_block_end_read(g_log.block_being_read); + g_log.block_being_read = NULL; + } + g_log.read_iterator_state = g_log.num_cores; + gpr_mu_unlock(&g_log.lock); +} + +const void *census_log_read_next(size_t *bytes_available) { + GPR_ASSERT(g_log.initialized); + gpr_mu_lock(&g_log.lock); + if (g_log.block_being_read != NULL) { + cl_block_end_read(g_log.block_being_read); + } + do { + g_log.block_being_read = cl_next_block_to_read(g_log.block_being_read); + if (g_log.block_being_read != NULL) { + void *record = + cl_block_start_read(g_log.block_being_read, bytes_available); + if (record != NULL) { + gpr_mu_unlock(&g_log.lock); + return record; + } + } + } while (g_log.block_being_read != NULL); + gpr_mu_unlock(&g_log.lock); + return NULL; +} + +size_t census_log_remaining_space(void) { + size_t space; + GPR_ASSERT(g_log.initialized); + gpr_mu_lock(&g_log.lock); + if (g_log.discard_old_records) { + /* Remaining space is not meaningful; just return the entire log space. */ + space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE; + } else { + space = g_log.free_block_list.count * CENSUS_LOG_MAX_RECORD_SIZE; + } + gpr_mu_unlock(&g_log.lock); + return space; +} + +int census_log_out_of_space_count(void) { + GPR_ASSERT(g_log.initialized); + return gpr_atm_acq_load(&g_log.out_of_space_count); +} diff --git a/src/core/lib/statistics/census_log.h b/src/core/lib/statistics/census_log.h new file mode 100644 index 0000000000..c3fbd555ba --- /dev/null +++ b/src/core/lib/statistics/census_log.h @@ -0,0 +1,91 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_STATISTICS_CENSUS_LOG_H +#define GRPC_CORE_LIB_STATISTICS_CENSUS_LOG_H + +#include <stddef.h> + +/* Maximum record size, in bytes. */ +#define CENSUS_LOG_2_MAX_RECORD_SIZE 14 /* 2^14 = 16KB */ +#define CENSUS_LOG_MAX_RECORD_SIZE (1 << CENSUS_LOG_2_MAX_RECORD_SIZE) + +/* Initialize the statistics logging subsystem with the given log size. A log + size of 0 will result in the smallest possible log for the platform + (approximately CENSUS_LOG_MAX_RECORD_SIZE * gpr_cpu_num_cores()). If + discard_old_records is non-zero, then new records will displace older ones + when the log is full. This function must be called before any other + census_log functions. +*/ +void census_log_initialize(size_t size_in_mb, int discard_old_records); + +/* Shutdown the logging subsystem. Caller must ensure that: + - no in progress or future call to any census_log functions + - no incomplete records +*/ +void census_log_shutdown(void); + +/* Allocates and returns a 'size' bytes record and marks it in use. A + subsequent census_log_end_write() marks the record complete. The + 'bytes_written' census_log_end_write() argument must be <= + 'size'. Returns NULL if out-of-space AND: + - log is configured to keep old records OR + - all blocks are pinned by incomplete records. +*/ +void *census_log_start_write(size_t size); + +void census_log_end_write(void *record, size_t bytes_written); + +/* census_log_read_next() iterates over blocks with data and for each block + returns a pointer to the first unread byte. The number of bytes that can be + read are returned in 'bytes_available'. Reader is expected to read all + available data. Reading the data consumes it i.e. it cannot be read again. + census_log_read_next() returns NULL if the end is reached i.e last block + is read. census_log_init_reader() starts the iteration or aborts the + current iteration. +*/ +void census_log_init_reader(void); +const void *census_log_read_next(size_t *bytes_available); + +/* Returns estimated remaining space across all blocks, in bytes. If log is + configured to discard old records, returns total log space. Otherwise, + returns space available in empty blocks (partially filled blocks are + treated as full). +*/ +size_t census_log_remaining_space(void); + +/* Returns the number of times gprc_stats_log_start_write() failed due to + out-of-space. */ +int census_log_out_of_space_count(void); + +#endif /* GRPC_CORE_LIB_STATISTICS_CENSUS_LOG_H */ diff --git a/src/core/lib/statistics/census_rpc_stats.c b/src/core/lib/statistics/census_rpc_stats.c new file mode 100644 index 0000000000..2182561668 --- /dev/null +++ b/src/core/lib/statistics/census_rpc_stats.c @@ -0,0 +1,253 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include "src/core/lib/statistics/census_interface.h" +#include "src/core/lib/statistics/census_rpc_stats.h" +#include "src/core/lib/statistics/census_tracing.h" +#include "src/core/lib/statistics/hash_table.h" +#include "src/core/lib/statistics/window_stats.h" +#include "src/core/lib/support/murmur_hash.h" +#include "src/core/lib/support/string.h" + +#define NUM_INTERVALS 3 +#define MINUTE_INTERVAL 0 +#define HOUR_INTERVAL 1 +#define TOTAL_INTERVAL 2 + +/* for easier typing */ +typedef census_per_method_rpc_stats per_method_stats; + +/* Ensure mu is only initialized once. */ +static gpr_once g_stats_store_mu_init = GPR_ONCE_INIT; +/* Guards two stats stores. */ +static gpr_mu g_mu; +static census_ht *g_client_stats_store = NULL; +static census_ht *g_server_stats_store = NULL; + +static void init_mutex(void) { gpr_mu_init(&g_mu); } + +static void init_mutex_once(void) { + gpr_once_init(&g_stats_store_mu_init, init_mutex); +} + +static int cmp_str_keys(const void *k1, const void *k2) { + return strcmp((const char *)k1, (const char *)k2); +} + +/* TODO(hongyu): replace it with cityhash64 */ +static uint64_t simple_hash(const void *k) { + size_t len = strlen(k); + uint64_t higher = gpr_murmur_hash3((const char *)k, len / 2, 0); + return higher << 32 | + gpr_murmur_hash3((const char *)k + len / 2, len - len / 2, 0); +} + +static void delete_stats(void *stats) { + census_window_stats_destroy((struct census_window_stats *)stats); +} + +static void delete_key(void *key) { gpr_free(key); } + +static const census_ht_option ht_opt = { + CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */, + simple_hash /* hash function */, cmp_str_keys /* key comparator */, + delete_stats /* data deleter */, delete_key /* key deleter */ +}; + +static void init_rpc_stats(void *stats) { + memset(stats, 0, sizeof(census_rpc_stats)); +} + +static void stat_add_proportion(double p, void *base, const void *addme) { + census_rpc_stats *b = (census_rpc_stats *)base; + census_rpc_stats *a = (census_rpc_stats *)addme; + b->cnt += p * a->cnt; + b->rpc_error_cnt += p * a->rpc_error_cnt; + b->app_error_cnt += p * a->app_error_cnt; + b->elapsed_time_ms += p * a->elapsed_time_ms; + b->api_request_bytes += p * a->api_request_bytes; + b->wire_request_bytes += p * a->wire_request_bytes; + b->api_response_bytes += p * a->api_response_bytes; + b->wire_response_bytes += p * a->wire_response_bytes; +} + +static void stat_add(void *base, const void *addme) { + stat_add_proportion(1.0, base, addme); +} + +static gpr_timespec min_hour_total_intervals[3] = { + {60, 0}, {3600, 0}, {36000000, 0}}; + +static const census_window_stats_stat_info window_stats_settings = { + sizeof(census_rpc_stats), init_rpc_stats, stat_add, stat_add_proportion}; + +census_rpc_stats *census_rpc_stats_create_empty(void) { + census_rpc_stats *ret = + (census_rpc_stats *)gpr_malloc(sizeof(census_rpc_stats)); + memset(ret, 0, sizeof(census_rpc_stats)); + return ret; +} + +void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats *data) { + int i = 0; + for (i = 0; i < data->num_entries; i++) { + if (data->stats[i].method != NULL) { + gpr_free((void *)data->stats[i].method); + } + } + if (data->stats != NULL) { + gpr_free(data->stats); + } + data->num_entries = 0; + data->stats = NULL; +} + +static void record_stats(census_ht *store, census_op_id op_id, + const census_rpc_stats *stats) { + gpr_mu_lock(&g_mu); + if (store != NULL) { + census_trace_obj *trace = NULL; + census_internal_lock_trace_store(); + trace = census_get_trace_obj_locked(op_id); + if (trace != NULL) { + const char *method_name = census_get_trace_method_name(trace); + struct census_window_stats *window_stats = NULL; + census_ht_key key; + key.ptr = (void *)method_name; + window_stats = census_ht_find(store, key); + census_internal_unlock_trace_store(); + if (window_stats == NULL) { + window_stats = census_window_stats_create(3, min_hour_total_intervals, + 30, &window_stats_settings); + key.ptr = gpr_strdup(key.ptr); + census_ht_insert(store, key, (void *)window_stats); + } + census_window_stats_add(window_stats, gpr_now(GPR_CLOCK_REALTIME), stats); + } else { + census_internal_unlock_trace_store(); + } + } + gpr_mu_unlock(&g_mu); +} + +void census_record_rpc_client_stats(census_op_id op_id, + const census_rpc_stats *stats) { + record_stats(g_client_stats_store, op_id, stats); +} + +void census_record_rpc_server_stats(census_op_id op_id, + const census_rpc_stats *stats) { + record_stats(g_server_stats_store, op_id, stats); +} + +/* Get stats from input stats store */ +static void get_stats(census_ht *store, census_aggregated_rpc_stats *data) { + GPR_ASSERT(data != NULL); + if (data->num_entries != 0) { + census_aggregated_rpc_stats_set_empty(data); + } + gpr_mu_lock(&g_mu); + if (store != NULL) { + size_t n; + unsigned i, j; + gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); + census_ht_kv *kv = census_ht_get_all_elements(store, &n); + if (kv != NULL) { + data->num_entries = n; + data->stats = + (per_method_stats *)gpr_malloc(sizeof(per_method_stats) * n); + for (i = 0; i < n; i++) { + census_window_stats_sums sums[NUM_INTERVALS]; + for (j = 0; j < NUM_INTERVALS; j++) { + sums[j].statistic = (void *)census_rpc_stats_create_empty(); + } + data->stats[i].method = gpr_strdup(kv[i].k.ptr); + census_window_stats_get_sums(kv[i].v, now, sums); + data->stats[i].minute_stats = + *(census_rpc_stats *)sums[MINUTE_INTERVAL].statistic; + data->stats[i].hour_stats = + *(census_rpc_stats *)sums[HOUR_INTERVAL].statistic; + data->stats[i].total_stats = + *(census_rpc_stats *)sums[TOTAL_INTERVAL].statistic; + for (j = 0; j < NUM_INTERVALS; j++) { + gpr_free(sums[j].statistic); + } + } + gpr_free(kv); + } + } + gpr_mu_unlock(&g_mu); +} + +void census_get_client_stats(census_aggregated_rpc_stats *data) { + get_stats(g_client_stats_store, data); +} + +void census_get_server_stats(census_aggregated_rpc_stats *data) { + get_stats(g_server_stats_store, data); +} + +void census_stats_store_init(void) { + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_client_stats_store == NULL && g_server_stats_store == NULL) { + g_client_stats_store = census_ht_create(&ht_opt); + g_server_stats_store = census_ht_create(&ht_opt); + } else { + gpr_log(GPR_ERROR, "Census stats store already initialized."); + } + gpr_mu_unlock(&g_mu); +} + +void census_stats_store_shutdown(void) { + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_client_stats_store != NULL) { + census_ht_destroy(g_client_stats_store); + g_client_stats_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census server stats store not initialized."); + } + if (g_server_stats_store != NULL) { + census_ht_destroy(g_server_stats_store); + g_server_stats_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census client stats store not initialized."); + } + gpr_mu_unlock(&g_mu); +} diff --git a/src/core/lib/statistics/census_rpc_stats.h b/src/core/lib/statistics/census_rpc_stats.h new file mode 100644 index 0000000000..00bb48205e --- /dev/null +++ b/src/core/lib/statistics/census_rpc_stats.h @@ -0,0 +1,101 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_STATISTICS_CENSUS_RPC_STATS_H +#define GRPC_CORE_LIB_STATISTICS_CENSUS_RPC_STATS_H + +#include <grpc/support/port_platform.h> +#include "src/core/lib/statistics/census_interface.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct census_rpc_stats { + uint64_t cnt; + uint64_t rpc_error_cnt; + uint64_t app_error_cnt; + double elapsed_time_ms; + double api_request_bytes; + double wire_request_bytes; + double api_response_bytes; + double wire_response_bytes; +}; + +/* Creates an empty rpc stats object on heap. */ +census_rpc_stats *census_rpc_stats_create_empty(void); + +typedef struct census_per_method_rpc_stats { + const char *method; + census_rpc_stats minute_stats; /* cumulative stats in the past minute */ + census_rpc_stats hour_stats; /* cumulative stats in the past hour */ + census_rpc_stats total_stats; /* cumulative stats from last gc */ +} census_per_method_rpc_stats; + +typedef struct census_aggregated_rpc_stats { + int num_entries; + census_per_method_rpc_stats *stats; +} census_aggregated_rpc_stats; + +/* Initializes an aggregated rpc stats object to an empty state. */ +void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats *data); + +/* Records client side stats of a rpc. */ +void census_record_rpc_client_stats(census_op_id op_id, + const census_rpc_stats *stats); + +/* Records server side stats of a rpc. */ +void census_record_rpc_server_stats(census_op_id op_id, + const census_rpc_stats *stats); + +/* The following two functions are intended for inprocess query of + per-service per-method stats from grpc implementations. */ + +/* Populates *data_map with server side aggregated per-service per-method + stats. + DO NOT CALL from outside of grpc code. */ +void census_get_server_stats(census_aggregated_rpc_stats *data_map); + +/* Populates *data_map with client side aggregated per-service per-method + stats. + DO NOT CALL from outside of grpc code. */ +void census_get_client_stats(census_aggregated_rpc_stats *data_map); + +void census_stats_store_init(void); +void census_stats_store_shutdown(void); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_STATISTICS_CENSUS_RPC_STATS_H */ diff --git a/src/core/lib/statistics/census_tracing.c b/src/core/lib/statistics/census_tracing.c new file mode 100644 index 0000000000..b58ae733fc --- /dev/null +++ b/src/core/lib/statistics/census_tracing.c @@ -0,0 +1,241 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/statistics/census_tracing.h" +#include "src/core/lib/statistics/census_interface.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> +#include "src/core/lib/statistics/hash_table.h" +#include "src/core/lib/support/string.h" + +void census_trace_obj_destroy(census_trace_obj *obj) { + census_trace_annotation *p = obj->annotations; + while (p != NULL) { + census_trace_annotation *next = p->next; + gpr_free(p); + p = next; + } + gpr_free(obj->method); + gpr_free(obj); +} + +static void delete_trace_obj(void *obj) { + census_trace_obj_destroy((census_trace_obj *)obj); +} + +static const census_ht_option ht_opt = { + CENSUS_HT_UINT64 /* key type */, + 571 /* n_of_buckets */, + NULL /* hash */, + NULL /* compare_keys */, + delete_trace_obj /* delete data */, + NULL /* delete key */ +}; + +static gpr_once g_init_mutex_once = GPR_ONCE_INIT; +static gpr_mu g_mu; /* Guards following two static variables. */ +static census_ht *g_trace_store = NULL; +static uint64_t g_id = 0; + +static census_ht_key op_id_as_key(census_op_id *id) { + return *(census_ht_key *)id; +} + +static uint64_t op_id_2_uint64(census_op_id *id) { + uint64_t ret; + memcpy(&ret, id, sizeof(census_op_id)); + return ret; +} + +static void init_mutex(void) { gpr_mu_init(&g_mu); } + +static void init_mutex_once(void) { + gpr_once_init(&g_init_mutex_once, init_mutex); +} + +census_op_id census_tracing_start_op(void) { + gpr_mu_lock(&g_mu); + { + census_trace_obj *ret = gpr_malloc(sizeof(census_trace_obj)); + memset(ret, 0, sizeof(census_trace_obj)); + g_id++; + memcpy(&ret->id, &g_id, sizeof(census_op_id)); + ret->rpc_stats.cnt = 1; + ret->ts = gpr_now(GPR_CLOCK_REALTIME); + census_ht_insert(g_trace_store, op_id_as_key(&ret->id), (void *)ret); + gpr_log(GPR_DEBUG, "Start tracing for id %lu", g_id); + gpr_mu_unlock(&g_mu); + return ret->id; + } +} + +int census_add_method_tag(census_op_id op_id, const char *method) { + int ret = 0; + census_trace_obj *trace = NULL; + gpr_mu_lock(&g_mu); + trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); + if (trace == NULL) { + ret = 1; + } else { + trace->method = gpr_strdup(method); + } + gpr_mu_unlock(&g_mu); + return ret; +} + +void census_tracing_print(census_op_id op_id, const char *anno_txt) { + census_trace_obj *trace = NULL; + gpr_mu_lock(&g_mu); + trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); + if (trace != NULL) { + census_trace_annotation *anno = gpr_malloc(sizeof(census_trace_annotation)); + anno->ts = gpr_now(GPR_CLOCK_REALTIME); + { + char *d = anno->txt; + const char *s = anno_txt; + int n = 0; + for (; n < CENSUS_MAX_ANNOTATION_LENGTH && *s != '\0'; ++n) { + *d++ = *s++; + } + *d = '\0'; + } + anno->next = trace->annotations; + trace->annotations = anno; + } + gpr_mu_unlock(&g_mu); +} + +void census_tracing_end_op(census_op_id op_id) { + census_trace_obj *trace = NULL; + gpr_mu_lock(&g_mu); + trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); + if (trace != NULL) { + trace->rpc_stats.elapsed_time_ms = gpr_timespec_to_micros( + gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), trace->ts)); + gpr_log(GPR_DEBUG, "End tracing for id %lu, method %s, latency %f us", + op_id_2_uint64(&op_id), trace->method, + trace->rpc_stats.elapsed_time_ms); + census_ht_erase(g_trace_store, op_id_as_key(&op_id)); + } + gpr_mu_unlock(&g_mu); +} + +void census_tracing_init(void) { + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_trace_store == NULL) { + g_id = 1; + g_trace_store = census_ht_create(&ht_opt); + } else { + gpr_log(GPR_ERROR, "Census trace store already initialized."); + } + gpr_mu_unlock(&g_mu); +} + +void census_tracing_shutdown(void) { + gpr_mu_lock(&g_mu); + if (g_trace_store != NULL) { + census_ht_destroy(g_trace_store); + g_trace_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census trace store is not initialized."); + } + gpr_mu_unlock(&g_mu); +} + +void census_internal_lock_trace_store(void) { gpr_mu_lock(&g_mu); } + +void census_internal_unlock_trace_store(void) { gpr_mu_unlock(&g_mu); } + +census_trace_obj *census_get_trace_obj_locked(census_op_id op_id) { + if (g_trace_store == NULL) { + gpr_log(GPR_ERROR, "Census trace store is not initialized."); + return NULL; + } + return (census_trace_obj *)census_ht_find(g_trace_store, + op_id_as_key(&op_id)); +} + +const char *census_get_trace_method_name(const census_trace_obj *trace) { + return trace->method; +} + +static census_trace_annotation *dup_annotation_chain( + census_trace_annotation *from) { + census_trace_annotation *ret = NULL; + census_trace_annotation **to = &ret; + for (; from != NULL; from = from->next) { + *to = gpr_malloc(sizeof(census_trace_annotation)); + memcpy(*to, from, sizeof(census_trace_annotation)); + to = &(*to)->next; + } + return ret; +} + +static census_trace_obj *trace_obj_dup(census_trace_obj *from) { + census_trace_obj *to = NULL; + GPR_ASSERT(from != NULL); + to = gpr_malloc(sizeof(census_trace_obj)); + to->id = from->id; + to->ts = from->ts; + to->rpc_stats = from->rpc_stats; + to->method = gpr_strdup(from->method); + to->annotations = dup_annotation_chain(from->annotations); + return to; +} + +census_trace_obj **census_get_active_ops(int *num_active_ops) { + census_trace_obj **ret = NULL; + gpr_mu_lock(&g_mu); + if (g_trace_store != NULL) { + size_t n = 0; + census_ht_kv *all_kvs = census_ht_get_all_elements(g_trace_store, &n); + *num_active_ops = (int)n; + if (n != 0) { + size_t i = 0; + ret = gpr_malloc(sizeof(census_trace_obj *) * n); + for (i = 0; i < n; i++) { + ret[i] = trace_obj_dup((census_trace_obj *)all_kvs[i].v); + } + } + gpr_free(all_kvs); + } + gpr_mu_unlock(&g_mu); + return ret; +} diff --git a/src/core/lib/statistics/census_tracing.h b/src/core/lib/statistics/census_tracing.h new file mode 100644 index 0000000000..a101abf3cb --- /dev/null +++ b/src/core/lib/statistics/census_tracing.h @@ -0,0 +1,96 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_STATISTICS_CENSUS_TRACING_H +#define GRPC_CORE_LIB_STATISTICS_CENSUS_TRACING_H + +#include <grpc/support/time.h> +#include "src/core/lib/statistics/census_rpc_stats.h" + +/* WARNING: The data structures and APIs provided by this file are for GRPC + library's internal use ONLY. They might be changed in backward-incompatible + ways and are not subject to any deprecation policy. + They are not recommended for external use. + */ +#ifdef __cplusplus +extern "C" { +#endif + +/* Struct for a trace annotation. */ +typedef struct census_trace_annotation { + gpr_timespec ts; /* timestamp of the annotation */ + char txt[CENSUS_MAX_ANNOTATION_LENGTH + 1]; /* actual txt annotation */ + struct census_trace_annotation *next; +} census_trace_annotation; + +typedef struct census_trace_obj { + census_op_id id; + gpr_timespec ts; + census_rpc_stats rpc_stats; + char *method; + census_trace_annotation *annotations; +} census_trace_obj; + +/* Deletes trace object. */ +void census_trace_obj_destroy(census_trace_obj *obj); + +/* Initializes trace store. This function is thread safe. */ +void census_tracing_init(void); + +/* Shutsdown trace store. This function is thread safe. */ +void census_tracing_shutdown(void); + +/* Gets trace obj corresponding to the input op_id. Returns NULL if trace store + is not initialized or trace obj is not found. Requires trace store being + locked before calling this function. */ +census_trace_obj *census_get_trace_obj_locked(census_op_id op_id); + +/* The following two functions acquire and release the trace store global lock. + They are for census internal use only. */ +void census_internal_lock_trace_store(void); +void census_internal_unlock_trace_store(void); + +/* Gets method name associated with the input trace object. */ +const char *census_get_trace_method_name(const census_trace_obj *trace); + +/* Returns an array of pointers to trace objects of currently active operations + and fills in number of active operations. Returns NULL if there are no active + operations. + Caller owns the returned objects. */ +census_trace_obj **census_get_active_ops(int *num_active_ops); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_STATISTICS_CENSUS_TRACING_H */ diff --git a/src/core/lib/statistics/hash_table.c b/src/core/lib/statistics/hash_table.c new file mode 100644 index 0000000000..18b7442a0c --- /dev/null +++ b/src/core/lib/statistics/hash_table.c @@ -0,0 +1,303 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/statistics/hash_table.h" + +#include <stddef.h> +#include <stdio.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +#define CENSUS_HT_NUM_BUCKETS 1999 + +/* A single hash table data entry */ +typedef struct ht_entry { + census_ht_key key; + void *data; + struct ht_entry *next; +} ht_entry; + +/* hash table bucket */ +typedef struct bucket { + /* NULL if bucket is empty */ + ht_entry *next; + /* -1 if all buckets are empty. */ + int32_t prev_non_empty_bucket; + /* -1 if all buckets are empty. */ + int32_t next_non_empty_bucket; +} bucket; + +struct unresizable_hash_table { + /* Number of entries in the table */ + size_t size; + /* Number of buckets */ + uint32_t num_buckets; + /* Array of buckets initialized at creation time. Memory consumption is + 16 bytes per bucket on a 64-bit platform. */ + bucket *buckets; + /* Index of the first non-empty bucket. -1 iff size == 0. */ + int32_t first_non_empty_bucket; + /* Index of the last non_empty bucket. -1 iff size == 0. */ + int32_t last_non_empty_bucket; + /* Immutable options of this hash table, initialized at creation time. */ + census_ht_option options; +}; + +typedef struct entry_locator { + int32_t bucket_idx; + int is_first_in_chain; + int found; + ht_entry *prev_entry; +} entry_locator; + +/* Asserts if option is not valid. */ +void check_options(const census_ht_option *option) { + GPR_ASSERT(option != NULL); + GPR_ASSERT(option->num_buckets > 0); + GPR_ASSERT(option->key_type == CENSUS_HT_UINT64 || + option->key_type == CENSUS_HT_POINTER); + if (option->key_type == CENSUS_HT_UINT64) { + GPR_ASSERT(option->hash == NULL); + } else if (option->key_type == CENSUS_HT_POINTER) { + GPR_ASSERT(option->hash != NULL); + GPR_ASSERT(option->compare_keys != NULL); + } +} + +#define REMOVE_NEXT(options, ptr) \ + do { \ + ht_entry *tmp = (ptr)->next; \ + (ptr)->next = tmp->next; \ + delete_entry(options, tmp); \ + } while (0) + +static void delete_entry(const census_ht_option *opt, ht_entry *p) { + if (opt->delete_data != NULL) { + opt->delete_data(p->data); + } + if (opt->delete_key != NULL) { + opt->delete_key(p->key.ptr); + } + gpr_free(p); +} + +static uint64_t hash(const census_ht_option *opt, census_ht_key key) { + return opt->key_type == CENSUS_HT_UINT64 ? key.val : opt->hash(key.ptr); +} + +census_ht *census_ht_create(const census_ht_option *option) { + int i; + census_ht *ret = NULL; + check_options(option); + ret = (census_ht *)gpr_malloc(sizeof(census_ht)); + ret->size = 0; + ret->num_buckets = option->num_buckets; + ret->buckets = (bucket *)gpr_malloc(sizeof(bucket) * ret->num_buckets); + ret->options = *option; + /* initialize each bucket */ + for (i = 0; i < ret->options.num_buckets; i++) { + ret->buckets[i].prev_non_empty_bucket = -1; + ret->buckets[i].next_non_empty_bucket = -1; + ret->buckets[i].next = NULL; + } + return ret; +} + +static int32_t find_bucket_idx(const census_ht *ht, census_ht_key key) { + return hash(&ht->options, key) % ht->num_buckets; +} + +static int keys_match(const census_ht_option *opt, const ht_entry *p, + const census_ht_key key) { + GPR_ASSERT(opt->key_type == CENSUS_HT_UINT64 || + opt->key_type == CENSUS_HT_POINTER); + if (opt->key_type == CENSUS_HT_UINT64) return p->key.val == key.val; + return !opt->compare_keys((p->key).ptr, key.ptr); +} + +static entry_locator ht_find(const census_ht *ht, census_ht_key key) { + entry_locator loc = {0, 0, 0, NULL}; + int32_t idx = 0; + ht_entry *ptr = NULL; + GPR_ASSERT(ht != NULL); + idx = find_bucket_idx(ht, key); + ptr = ht->buckets[idx].next; + if (ptr == NULL) { + /* bucket is empty */ + return loc; + } + if (keys_match(&ht->options, ptr, key)) { + loc.bucket_idx = idx; + loc.is_first_in_chain = 1; + loc.found = 1; + return loc; + } else { + for (; ptr->next != NULL; ptr = ptr->next) { + if (keys_match(&ht->options, ptr->next, key)) { + loc.bucket_idx = idx; + loc.is_first_in_chain = 0; + loc.found = 1; + loc.prev_entry = ptr; + return loc; + } + } + } + /* Could not find the key */ + return loc; +} + +void *census_ht_find(const census_ht *ht, census_ht_key key) { + entry_locator loc = ht_find(ht, key); + if (loc.found == 0) { + return NULL; + } + return loc.is_first_in_chain ? ht->buckets[loc.bucket_idx].next->data + : loc.prev_entry->next->data; +} + +void census_ht_insert(census_ht *ht, census_ht_key key, void *data) { + int32_t idx = find_bucket_idx(ht, key); + ht_entry *ptr = NULL; + entry_locator loc = ht_find(ht, key); + if (loc.found) { + /* Replace old value with new value. */ + ptr = loc.is_first_in_chain ? ht->buckets[loc.bucket_idx].next + : loc.prev_entry->next; + if (ht->options.delete_data != NULL) { + ht->options.delete_data(ptr->data); + } + ptr->data = data; + return; + } + + /* first entry in the table. */ + if (ht->size == 0) { + ht->buckets[idx].next_non_empty_bucket = -1; + ht->buckets[idx].prev_non_empty_bucket = -1; + ht->first_non_empty_bucket = idx; + ht->last_non_empty_bucket = idx; + } else if (ht->buckets[idx].next == NULL) { + /* first entry in the bucket. */ + ht->buckets[ht->last_non_empty_bucket].next_non_empty_bucket = idx; + ht->buckets[idx].prev_non_empty_bucket = ht->last_non_empty_bucket; + ht->buckets[idx].next_non_empty_bucket = -1; + ht->last_non_empty_bucket = idx; + } + ptr = (ht_entry *)gpr_malloc(sizeof(ht_entry)); + ptr->key = key; + ptr->data = data; + ptr->next = ht->buckets[idx].next; + ht->buckets[idx].next = ptr; + ht->size++; +} + +void census_ht_erase(census_ht *ht, census_ht_key key) { + entry_locator loc = ht_find(ht, key); + if (loc.found == 0) { + /* noop if not found */ + return; + } + ht->size--; + if (loc.is_first_in_chain) { + bucket *b = &ht->buckets[loc.bucket_idx]; + GPR_ASSERT(b->next != NULL); + /* The only entry in the bucket */ + if (b->next->next == NULL) { + int prev = b->prev_non_empty_bucket; + int next = b->next_non_empty_bucket; + if (prev != -1) { + ht->buckets[prev].next_non_empty_bucket = next; + } else { + ht->first_non_empty_bucket = next; + } + if (next != -1) { + ht->buckets[next].prev_non_empty_bucket = prev; + } else { + ht->last_non_empty_bucket = prev; + } + } + REMOVE_NEXT(&ht->options, b); + } else { + GPR_ASSERT(loc.prev_entry->next != NULL); + REMOVE_NEXT(&ht->options, loc.prev_entry); + } +} + +/* Returns NULL if input table is empty. */ +census_ht_kv *census_ht_get_all_elements(const census_ht *ht, size_t *num) { + census_ht_kv *ret = NULL; + int i = 0; + int32_t idx = -1; + GPR_ASSERT(ht != NULL && num != NULL); + *num = ht->size; + if (*num == 0) { + return NULL; + } + + ret = (census_ht_kv *)gpr_malloc(sizeof(census_ht_kv) * ht->size); + idx = ht->first_non_empty_bucket; + while (idx >= 0) { + ht_entry *ptr = ht->buckets[idx].next; + for (; ptr != NULL; ptr = ptr->next) { + ret[i].k = ptr->key; + ret[i].v = ptr->data; + i++; + } + idx = ht->buckets[idx].next_non_empty_bucket; + } + return ret; +} + +static void ht_delete_entry_chain(const census_ht_option *options, + ht_entry *first) { + if (first == NULL) { + return; + } + if (first->next != NULL) { + ht_delete_entry_chain(options, first->next); + } + delete_entry(options, first); +} + +void census_ht_destroy(census_ht *ht) { + unsigned i; + for (i = 0; i < ht->num_buckets; ++i) { + ht_delete_entry_chain(&ht->options, ht->buckets[i].next); + } + gpr_free(ht->buckets); + gpr_free(ht); +} + +size_t census_ht_get_size(const census_ht *ht) { return ht->size; } diff --git a/src/core/lib/statistics/hash_table.h b/src/core/lib/statistics/hash_table.h new file mode 100644 index 0000000000..8f74ec82aa --- /dev/null +++ b/src/core/lib/statistics/hash_table.h @@ -0,0 +1,131 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_STATISTICS_HASH_TABLE_H +#define GRPC_CORE_LIB_STATISTICS_HASH_TABLE_H + +#include <stddef.h> + +#include <grpc/support/port_platform.h> + +/* A chain based hash table with fixed number of buckets. + Your probably shouldn't use this code directly. It is implemented for the + use case in census trace store and stats store, where number of entries in + the table is in the scale of upto several thousands, entries are added and + removed from the table very frequently (~100k/s), the frequency of find() + operations is roughly several times of the frequency of insert() and erase() + Comparing to find(), the insert(), erase() and get_all_entries() operations + are much less freqent (<1/s). + + Per bucket memory overhead is about (8 + sizeof(intptr_t) bytes. + Per entry memory overhead is about (8 + 2 * sizeof(intptr_t) bytes. + + All functions are not thread-safe. Synchronization will be provided in the + upper layer (in trace store and stats store). +*/ + +/* Opaque hash table struct */ +typedef struct unresizable_hash_table census_ht; + +/* Currently, the hash_table can take two types of keys. (uint64 for trace + store and const char* for stats store). */ +typedef union { + uint64_t val; + void *ptr; +} census_ht_key; + +typedef enum census_ht_key_type { + CENSUS_HT_UINT64 = 0, + CENSUS_HT_POINTER = 1 +} census_ht_key_type; + +typedef struct census_ht_option { + /* Type of hash key */ + census_ht_key_type key_type; + /* Desired number of buckets, preferably a prime number */ + int32_t num_buckets; + /* Fucntion to calculate uint64 hash value of the key. Only takes effect if + key_type is POINTER. */ + uint64_t (*hash)(const void *); + /* Function to compare two keys, returns 0 iff equal. Only takes effect if + key_type is POINTER */ + int (*compare_keys)(const void *k1, const void *k2); + /* Value deleter. NULL if no specialized delete function is needed. */ + void (*delete_data)(void *); + /* Key deleter. NULL if table does not own the key. (e.g. key is part of the + value or key is not owned by the table.) */ + void (*delete_key)(void *); +} census_ht_option; + +/* Creates a hashtable with fixed number of buckets according to the settings + specified in 'options' arg. Function pointers "hash" and "compare_keys" must + be provided if key_type is POINTER. Asserts if fail to create. */ +census_ht *census_ht_create(const census_ht_option *options); + +/* Deletes hash table instance. Frees all dynamic memory owned by ht.*/ +void census_ht_destroy(census_ht *ht); + +/* Inserts the input key-val pair into hash_table. If an entry with the same key + exists in the table, the corresponding value will be overwritten by the input + val. */ +void census_ht_insert(census_ht *ht, census_ht_key key, void *val); + +/* Returns pointer to data, returns NULL if not found. */ +void *census_ht_find(const census_ht *ht, census_ht_key key); + +/* Erase hash table entry with input key. Noop if key is not found. */ +void census_ht_erase(census_ht *ht, census_ht_key key); + +typedef struct census_ht_kv { + census_ht_key k; + void *v; +} census_ht_kv; + +/* Returns an array of pointers to all values in the hash table. Order of the + elements can be arbitrary. Sets 'num' to the size of returned array. Caller + owns returned array. */ +census_ht_kv *census_ht_get_all_elements(const census_ht *ht, size_t *num); + +/* Returns number of elements kept. */ +size_t census_ht_get_size(const census_ht *ht); + +/* Functor applied on each key-value pair while iterating through entries in the + table. The functor should not mutate data. */ +typedef void (*census_ht_itr_cb)(census_ht_key key, const void *val_ptr, + void *state); + +/* Iterates through all key-value pairs in the hash_table. The callback function + should not invalidate data entries. */ +uint64_t census_ht_for_all(const census_ht *ht, census_ht_itr_cb); + +#endif /* GRPC_CORE_LIB_STATISTICS_HASH_TABLE_H */ diff --git a/src/core/lib/statistics/window_stats.c b/src/core/lib/statistics/window_stats.c new file mode 100644 index 0000000000..53427a24bc --- /dev/null +++ b/src/core/lib/statistics/window_stats.c @@ -0,0 +1,316 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/statistics/window_stats.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include <math.h> +#include <stddef.h> +#include <string.h> + +/* typedefs make typing long names easier. Use cws (for census_window_stats) */ +typedef census_window_stats_stat_info cws_stat_info; +typedef struct census_window_stats_sum cws_sum; + +/* Each interval is composed of a number of buckets, which hold a count of + entries and a single statistic */ +typedef struct census_window_stats_bucket { + int64_t count; + void *statistic; +} cws_bucket; + +/* Each interval has a set of buckets, and the variables needed to keep + track of their current state */ +typedef struct census_window_stats_interval_stats { + /* The buckets. There will be 'granularity' + 1 of these. */ + cws_bucket *buckets; + /* Index of the bucket containing the smallest time interval. */ + int bottom_bucket; + /* The smallest time storable in the current window. */ + int64_t bottom; + /* The largest time storable in the current window + 1ns */ + int64_t top; + /* The width of each bucket in ns. */ + int64_t width; +} cws_interval_stats; + +typedef struct census_window_stats { + /* Number of intervals. */ + int nintervals; + /* Number of buckets in each interval. 'granularity' + 1. */ + int nbuckets; + /* Record of stat_info. */ + cws_stat_info stat_info; + /* Stats for each interval. */ + cws_interval_stats *interval_stats; + /* The time the newset stat was recorded. */ + int64_t newest_time; +} window_stats; + +/* Calculate an actual bucket index from a logical index 'IDX'. Other + parameters supply information on the interval struct and overall stats. */ +#define BUCKET_IDX(IS, IDX, WSTATS) \ + ((IS->bottom_bucket + (IDX)) % WSTATS->nbuckets) + +/* The maximum seconds value we can have in a valid timespec. More than this + will result in overflow in timespec_to_ns(). This works out to ~292 years. + TODO: consider using doubles instead of int64. */ +static int64_t max_seconds = (GPR_INT64_MAX - GPR_NS_PER_SEC) / GPR_NS_PER_SEC; + +static int64_t timespec_to_ns(const gpr_timespec ts) { + if (ts.tv_sec > max_seconds) { + return GPR_INT64_MAX - 1; + } + return ts.tv_sec * GPR_NS_PER_SEC + ts.tv_nsec; +} + +static void cws_initialize_statistic(void *statistic, + const cws_stat_info *stat_info) { + if (stat_info->stat_initialize == NULL) { + memset(statistic, 0, stat_info->stat_size); + } else { + stat_info->stat_initialize(statistic); + } +} + +/* Create and initialize a statistic */ +static void *cws_create_statistic(const cws_stat_info *stat_info) { + void *stat = gpr_malloc(stat_info->stat_size); + cws_initialize_statistic(stat, stat_info); + return stat; +} + +window_stats *census_window_stats_create(int nintervals, + const gpr_timespec intervals[], + int granularity, + const cws_stat_info *stat_info) { + window_stats *ret; + int i; + /* validate inputs */ + GPR_ASSERT(nintervals > 0 && granularity > 2 && intervals != NULL && + stat_info != NULL); + for (i = 0; i < nintervals; i++) { + int64_t ns = timespec_to_ns(intervals[i]); + GPR_ASSERT(intervals[i].tv_sec >= 0 && intervals[i].tv_nsec >= 0 && + intervals[i].tv_nsec < GPR_NS_PER_SEC && ns >= 100 && + granularity * 10 <= ns); + } + /* Allocate and initialize relevant data structures */ + ret = (window_stats *)gpr_malloc(sizeof(window_stats)); + ret->nintervals = nintervals; + ret->nbuckets = granularity + 1; + ret->stat_info = *stat_info; + ret->interval_stats = + (cws_interval_stats *)gpr_malloc(nintervals * sizeof(cws_interval_stats)); + for (i = 0; i < nintervals; i++) { + int64_t size_ns = timespec_to_ns(intervals[i]); + cws_interval_stats *is = ret->interval_stats + i; + cws_bucket *buckets = is->buckets = + (cws_bucket *)gpr_malloc(ret->nbuckets * sizeof(cws_bucket)); + int b; + for (b = 0; b < ret->nbuckets; b++) { + buckets[b].statistic = cws_create_statistic(stat_info); + buckets[b].count = 0; + } + is->bottom_bucket = 0; + is->bottom = 0; + is->width = size_ns / granularity; + /* Check for possible overflow issues, and maximize interval size if the + user requested something large enough. */ + if ((GPR_INT64_MAX - is->width) > size_ns) { + is->top = size_ns + is->width; + } else { + is->top = GPR_INT64_MAX; + is->width = GPR_INT64_MAX / (granularity + 1); + } + /* If size doesn't divide evenly, we can have a width slightly too small; + better to have it slightly large. */ + if ((size_ns - (granularity + 1) * is->width) > 0) { + is->width += 1; + } + } + ret->newest_time = 0; + return ret; +} + +/* When we try adding a measurement above the current interval range, we + need to "shift" the buckets sufficiently to cover the new range. */ +static void cws_shift_buckets(const window_stats *wstats, + cws_interval_stats *is, int64_t when_ns) { + int i; + /* number of bucket time widths to "shift" */ + int shift; + /* number of buckets to clear */ + int nclear; + GPR_ASSERT(when_ns >= is->top); + /* number of bucket time widths to "shift" */ + shift = ((when_ns - is->top) / is->width) + 1; + /* number of buckets to clear - limited by actual number of buckets */ + nclear = GPR_MIN(shift, wstats->nbuckets); + for (i = 0; i < nclear; i++) { + int b = BUCKET_IDX(is, i, wstats); + is->buckets[b].count = 0; + cws_initialize_statistic(is->buckets[b].statistic, &wstats->stat_info); + } + /* adjust top/bottom times and current bottom bucket */ + is->bottom_bucket = BUCKET_IDX(is, shift, wstats); + is->top += shift * is->width; + is->bottom += shift * is->width; +} + +void census_window_stats_add(window_stats *wstats, const gpr_timespec when, + const void *stat_value) { + int i; + int64_t when_ns = timespec_to_ns(when); + GPR_ASSERT(wstats->interval_stats != NULL); + for (i = 0; i < wstats->nintervals; i++) { + cws_interval_stats *is = wstats->interval_stats + i; + cws_bucket *bucket; + if (when_ns < is->bottom) { /* Below smallest time in interval: drop */ + continue; + } + if (when_ns >= is->top) { /* above limit: shift buckets */ + cws_shift_buckets(wstats, is, when_ns); + } + /* Add the stat. */ + GPR_ASSERT(is->bottom <= when_ns && when_ns < is->top); + bucket = is->buckets + + BUCKET_IDX(is, (when_ns - is->bottom) / is->width, wstats); + bucket->count++; + wstats->stat_info.stat_add(bucket->statistic, stat_value); + } + if (when_ns > wstats->newest_time) { + wstats->newest_time = when_ns; + } +} + +/* Add a specific bucket contents to an accumulating total. */ +static void cws_add_bucket_to_sum(cws_sum *sum, const cws_bucket *bucket, + const cws_stat_info *stat_info) { + sum->count += bucket->count; + stat_info->stat_add(sum->statistic, bucket->statistic); +} + +/* Add a proportion to an accumulating sum. */ +static void cws_add_proportion_to_sum(double p, cws_sum *sum, + const cws_bucket *bucket, + const cws_stat_info *stat_info) { + sum->count += p * bucket->count; + stat_info->stat_add_proportion(p, sum->statistic, bucket->statistic); +} + +void census_window_stats_get_sums(const window_stats *wstats, + const gpr_timespec when, cws_sum sums[]) { + int i; + int64_t when_ns = timespec_to_ns(when); + GPR_ASSERT(wstats->interval_stats != NULL); + for (i = 0; i < wstats->nintervals; i++) { + int when_bucket; + int new_bucket; + double last_proportion = 1.0; + double bottom_proportion; + cws_interval_stats *is = wstats->interval_stats + i; + cws_sum *sum = sums + i; + sum->count = 0; + cws_initialize_statistic(sum->statistic, &wstats->stat_info); + if (when_ns < is->bottom) { + continue; + } + if (when_ns >= is->top) { + cws_shift_buckets(wstats, is, when_ns); + } + /* Calculating the appropriate amount of which buckets to use can get + complicated. Essentially there are two cases: + 1) if the "top" bucket (new_bucket, where the newest additions to the + stats recorded are entered) corresponds to 'when', then we need + to take a proportion of it - (if when < newest_time) or the full + thing. We also (possibly) need to take a corresponding + proportion of the bottom bucket. + 2) Other cases, we just take a straight proportion. + */ + when_bucket = (when_ns - is->bottom) / is->width; + new_bucket = (wstats->newest_time - is->bottom) / is->width; + if (new_bucket == when_bucket) { + int64_t bottom_bucket_time = is->bottom + when_bucket * is->width; + if (when_ns < wstats->newest_time) { + last_proportion = (double)(when_ns - bottom_bucket_time) / + (double)(wstats->newest_time - bottom_bucket_time); + bottom_proportion = + (double)(is->width - (when_ns - bottom_bucket_time)) / is->width; + } else { + bottom_proportion = + (double)(is->width - (wstats->newest_time - bottom_bucket_time)) / + is->width; + } + } else { + last_proportion = + (double)(when_ns + 1 - is->bottom - when_bucket * is->width) / + is->width; + bottom_proportion = 1.0 - last_proportion; + } + cws_add_proportion_to_sum(last_proportion, sum, + is->buckets + BUCKET_IDX(is, when_bucket, wstats), + &wstats->stat_info); + if (when_bucket != 0) { /* last bucket isn't also bottom bucket */ + int b; + /* Add all of "bottom" bucket if we are looking at a subset of the + full interval, or a proportion if we are adding full interval. */ + cws_add_proportion_to_sum( + (when_bucket == wstats->nbuckets - 1 ? bottom_proportion : 1.0), sum, + is->buckets + is->bottom_bucket, &wstats->stat_info); + /* Add all the remaining buckets (everything but top and bottom). */ + for (b = 1; b < when_bucket; b++) { + cws_add_bucket_to_sum(sum, is->buckets + BUCKET_IDX(is, b, wstats), + &wstats->stat_info); + } + } + } +} + +void census_window_stats_destroy(window_stats *wstats) { + int i; + GPR_ASSERT(wstats->interval_stats != NULL); + for (i = 0; i < wstats->nintervals; i++) { + int b; + for (b = 0; b < wstats->nbuckets; b++) { + gpr_free(wstats->interval_stats[i].buckets[b].statistic); + } + gpr_free(wstats->interval_stats[i].buckets); + } + gpr_free(wstats->interval_stats); + /* Ensure any use-after free triggers assert. */ + wstats->interval_stats = NULL; + gpr_free(wstats); +} diff --git a/src/core/lib/statistics/window_stats.h b/src/core/lib/statistics/window_stats.h new file mode 100644 index 0000000000..8dec50d620 --- /dev/null +++ b/src/core/lib/statistics/window_stats.h @@ -0,0 +1,173 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_STATISTICS_WINDOW_STATS_H +#define GRPC_CORE_LIB_STATISTICS_WINDOW_STATS_H + +#include <grpc/support/time.h> + +/* Keep rolling sums of a user-defined statistic (containing a number of + measurements) over a a number of time intervals ("windows"). For example, + you can use a window_stats object to answer questions such as + "Approximately how many RPCs/s did I receive over the past minute, and + approximately how many bytes did I send out over that period?". + + The type of data to record, and the time intervals to keep are specified + when creating the object via a call to census_window_stats_create(). + + A window's interval is divided into one or more "buckets"; the interval + must be divisible by the number of buckets. Internally, these buckets + control the granularity of window_stats' measurements. Increasing the + number of buckets lets the object respond more quickly to changes in the + overall rate of data added into the object, at the cost of additional + memory usage. + + Here's some code which keeps one minute/hour measurements for two values + (latency in seconds and bytes transferred), with each interval divided into + 4 buckets. + + typedef struct my_stat { + double latency; + int bytes; + } my_stat; + + void add_my_stat(void* base, const void* addme) { + my_stat* b = (my_stat*)base; + const my_stat* a = (const my_stat*)addme; + b->latency += a->latency; + b->bytes += a->bytes; + } + + void add_proportion_my_stat(double p, void* base, const void* addme) { + (my_stat*)result->latency += p * (const my_stat*)base->latency; + (my_stat*)result->bytes += p * (const my_stat*)base->bytes; + } + + #define kNumIntervals 2 + #define kMinInterval 0 + #define kHourInterval 1 + #define kNumBuckets 4 + + const struct census_window_stats_stat_info kMyStatInfo + = { sizeof(my_stat), NULL, add_my_stat, add_proportion_my_stat }; + gpr_timespec intervals[kNumIntervals] = {{60, 0}, {3600, 0}}; + my_stat stat; + my_stat sums[kNumIntervals]; + census_window_stats_sums result[kNumIntervals]; + struct census_window_stats* stats + = census_window_stats_create(kNumIntervals, intervals, kNumBuckets, + &kMyStatInfo); + // Record a new event, taking 15.3ms, transferring 1784 bytes. + stat.latency = 0.153; + stat.bytes = 1784; + census_window_stats_add(stats, gpr_now(GPR_CLOCK_REALTIME), &stat); + // Get sums and print them out + result[kMinInterval].statistic = &sums[kMinInterval]; + result[kHourInterval].statistic = &sums[kHourInterval]; + census_window_stats_get_sums(stats, gpr_now(GPR_CLOCK_REALTIME), result); + printf("%d events/min, average time %gs, average bytes %g\n", + result[kMinInterval].count, + (my_stat*)result[kMinInterval].statistic->latency / + result[kMinInterval].count, + (my_stat*)result[kMinInterval].statistic->bytes / + result[kMinInterval].count + ); + printf("%d events/hr, average time %gs, average bytes %g\n", + result[kHourInterval].count, + (my_stat*)result[kHourInterval].statistic->latency / + result[kHourInterval].count, + (my_stat*)result[kHourInterval].statistic->bytes / + result[kHourInterval].count + ); +*/ + +/* Opaque structure for representing window_stats object */ +struct census_window_stats; + +/* Information provided by API user on the information they want to record */ +typedef struct census_window_stats_stat_info { + /* Number of bytes in user-defined object. */ + size_t stat_size; + /* Function to initialize a user-defined statistics object. If this is set + * to NULL, then the object will be zero-initialized. */ + void (*stat_initialize)(void *stat); + /* Function to add one user-defined statistics object ('addme') to 'base' */ + void (*stat_add)(void *base, const void *addme); + /* As for previous function, but only add a proportion 'p'. This API will + currently only use 'p' values in the range [0,1], but other values are + possible in the future, and should be supported. */ + void (*stat_add_proportion)(double p, void *base, const void *addme); +} census_window_stats_stat_info; + +/* Create a new window_stats object. 'nintervals' is the number of + 'intervals', and must be >=1. 'granularity' is the number of buckets, with + a larger number using more memory, but providing greater accuracy of + results. 'granularity should be > 2. We also require that each interval be + at least 10 * 'granularity' nanoseconds in size. 'stat_info' contains + information about the statistic to be gathered. Intervals greater than ~192 + years will be treated as essentially infinite in size. This function will + GPR_ASSERT() if the object cannot be created or any of the parameters have + invalid values. This function is thread-safe. */ +struct census_window_stats *census_window_stats_create( + int nintervals, const gpr_timespec intervals[], int granularity, + const census_window_stats_stat_info *stat_info); + +/* Add a new measurement (in 'stat_value'), as of a given time ('when'). + This function is thread-compatible. */ +void census_window_stats_add(struct census_window_stats *wstats, + const gpr_timespec when, const void *stat_value); + +/* Structure used to record a single intervals sum for a given statistic */ +typedef struct census_window_stats_sum { + /* Total count of samples. Note that because some internal interpolation + is performed, the count of samples returned for each interval may not be an + integral value. */ + double count; + /* Sum for statistic */ + void *statistic; +} census_window_stats_sums; + +/* Retrieve a set of all values stored in a window_stats object 'wstats'. The + number of 'sums' MUST be the same as the number 'nintervals' used in + census_window_stats_create(). This function is thread-compatible. */ +void census_window_stats_get_sums(const struct census_window_stats *wstats, + const gpr_timespec when, + struct census_window_stats_sum sums[]); + +/* Destroy a window_stats object. Once this function has been called, the + object will no longer be usable from any of the above functions (and + calling them will most likely result in a NULL-pointer dereference or + assertion failure). This function is thread-compatible. */ +void census_window_stats_destroy(struct census_window_stats *wstats); + +#endif /* GRPC_CORE_LIB_STATISTICS_WINDOW_STATS_H */ |