diff options
author | Craig Tiller <ctiller@google.com> | 2016-04-04 13:54:41 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-04-04 13:54:41 -0700 |
commit | 38ec40608f693ffcc52120d5d31bec9084930537 (patch) | |
tree | bd15604e7eea09cbf9ed4596ce42c9b5b7740f59 /src/core/ext | |
parent | c532c21ee52bdcddf7292560673a61538ea3d29f (diff) | |
parent | 1b7c0a2c5cf27d7a77d9c3476fe6406a98ca3d76 (diff) |
Merge github.com:grpc/grpc into split-me-baby-one-more-time
Diffstat (limited to 'src/core/ext')
33 files changed, 5640 insertions, 1 deletions
diff --git a/src/core/ext/census/README.md b/src/core/ext/census/README.md new file mode 100644 index 0000000000..fb615a2194 --- /dev/null +++ b/src/core/ext/census/README.md @@ -0,0 +1,76 @@ +<!--- + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +--> + +# Census - a resource measurement and tracing system + +This directory contains code for Census, which will ultimately provide the +following features for any gRPC-using system: +* A [dapper](http://research.google.com/pubs/pub36356.html)-like tracing + system, enabling tracing across a distributed infrastructure. +* RPC statistics and measurements for key metrics, such as latency, bytes + transferred, number of errors etc. +* Resource measurement framework which can be used for measuring custom + metrics. Through the use of [tags](#Tags), these can be broken down across + the entire distributed stack. +* Easy integration of the above with + [Google Cloud Trace](https://cloud.google.com/tools/cloud-trace) and + [Google Cloud Monitoring](https://cloud.google.com/monitoring/). + +## Concepts + +### Context + +### Operations + +### Tags + +### Metrics + +## API + +### Internal/RPC API + +### External/Client API + +### RPC API + +## Files in this directory + +Note that files and functions in this directory can be split into two +categories: +* Files that define core census library functions. Functions etc. in these + files are named census\_\*, and constitute the core census library + functionality. At some time in the future, these will become a standalone + library. +* Files that define functions etc. that provide a convenient interface between + grpc and the core census functionality. These files are all named + grpc\_\*.{c,h}, and define function names beginning with grpc\_census\_\*. + diff --git a/src/core/ext/census/aggregation.h b/src/core/ext/census/aggregation.h new file mode 100644 index 0000000000..45f789c772 --- /dev/null +++ b/src/core/ext/census/aggregation.h @@ -0,0 +1,66 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <stddef.h> + +#ifndef GRPC_CORE_EXT_CENSUS_AGGREGATION_H +#define GRPC_CORE_EXT_CENSUS_AGGREGATION_H + +/** Structure used to describe an aggregation type. */ +struct census_aggregation_ops { + /* Create a new aggregation. The pointer returned can be used in future calls + to clone(), free(), record(), data() and reset(). */ + void *(*create)(const void *create_arg); + /* Make a copy of an aggregation created by create() */ + void *(*clone)(const void *aggregation); + /* Destroy an aggregation created by create() */ + void (*free)(void *aggregation); + /* Record a new value against aggregation. */ + void (*record)(void *aggregation, double value); + /* Return current aggregation data. The caller must cast this object into + the correct type for the aggregation result. The object returned can be + freed by using free_data(). */ + void *(*data)(const void *aggregation); + /* free data returned by data() */ + void (*free_data)(void *data); + /* Reset an aggregation to default (zero) values. */ + void (*reset)(void *aggregation); + /* Merge 'from' aggregation into 'to'. Both aggregations must be compatible */ + void (*merge)(void *to, const void *from); + /* Fill buffer with printable string version of aggregation contents. For + debugging only. Returns the number of bytes added to buffer (a value == n + implies the buffer was of insufficient size). */ + size_t (*print)(const void *aggregation, char *buffer, size_t n); +}; + +#endif /* GRPC_CORE_EXT_CENSUS_AGGREGATION_H */ diff --git a/src/core/ext/census/census_init.c b/src/core/ext/census/census_init.c new file mode 100644 index 0000000000..690b09e789 --- /dev/null +++ b/src/core/ext/census/census_init.c @@ -0,0 +1,48 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/census/census_interface.h" + +#include <grpc/support/log.h> +#include "src/core/ext/census/census_rpc_stats.h" +#include "src/core/ext/census/census_tracing.h" + +void census_init(void) { + census_tracing_init(); + census_stats_store_init(); +} + +void census_shutdown(void) { + census_stats_store_shutdown(); + census_tracing_shutdown(); +} diff --git a/src/core/ext/census/census_interface.h b/src/core/ext/census/census_interface.h new file mode 100644 index 0000000000..57e75f56ee --- /dev/null +++ b/src/core/ext/census/census_interface.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_CENSUS_INTERFACE_H +#define GRPC_CORE_EXT_CENSUS_CENSUS_INTERFACE_H + +#include <grpc/support/port_platform.h> + +/* Maximum length of an individual census trace annotation. */ +#define CENSUS_MAX_ANNOTATION_LENGTH 200 + +/* Structure of a census op id. Define as structure because 64bit integer is not + available on every platform for C89. */ +typedef struct census_op_id { + uint32_t upper; + uint32_t lower; +} census_op_id; + +typedef struct census_rpc_stats census_rpc_stats; + +/* Initializes Census library. No-op if Census is already initialized. */ +void census_init(void); + +/* Shutdown Census Library. */ +void census_shutdown(void); + +/* Annotates grpc method name on a census_op_id. The method name has the format + of <full quantified rpc service name>/<rpc function name>. Returns 0 iff + op_id and method_name are all valid. op_id is valid after its creation and + before calling census_tracing_end_op(). + + TODO(hongyu): Figure out valid characters set for service name and command + name and document requirements here.*/ +int census_add_method_tag(census_op_id op_id, const char *method_name); + +/* Annotates tracing information to a specific op_id. + Up to CENSUS_MAX_ANNOTATION_LENGTH bytes are recorded. */ +void census_tracing_print(census_op_id op_id, const char *annotation); + +/* Starts tracing for an RPC. Returns a locally unique census_op_id */ +census_op_id census_tracing_start_op(void); + +/* Ends tracing. Calling this function will invalidate the input op_id. */ +void census_tracing_end_op(census_op_id op_id); + +#endif /* GRPC_CORE_EXT_CENSUS_CENSUS_INTERFACE_H */ diff --git a/src/core/ext/census/census_log.c b/src/core/ext/census/census_log.c new file mode 100644 index 0000000000..9a7331adc2 --- /dev/null +++ b/src/core/ext/census/census_log.c @@ -0,0 +1,603 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Available log space is divided up in blocks of + CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the + following three data structures: + - Free blocks (free_block_list) + - Blocks with unread data (dirty_block_list) + - Blocks currently attached to cores (core_local_blocks[]) + + census_log_start_write() moves a block from core_local_blocks[] to the + end of dirty_block_list when block: + - is out-of-space OR + - has an incomplete record (an incomplete record occurs when a thread calls + census_log_start_write() and is context-switched before calling + census_log_end_write() + So, blocks in dirty_block_list are ordered, from oldest to newest, by time + when block is detached from the core. + + census_log_read_next() first iterates over dirty_block_list and then + core_local_blocks[]. It moves completely read blocks from dirty_block_list + to free_block_list. Blocks in core_local_blocks[] are not freed, even when + completely read. + + If log is configured to discard old records and free_block_list is empty, + census_log_start_write() iterates over dirty_block_list to allocate a + new block. It moves the oldest available block (no pending read/write) to + core_local_blocks[]. + + core_local_block_struct is used to implement a map from core id to the block + associated with that core. This mapping is advisory. It is possible that the + block returned by this mapping is no longer associated with that core. This + mapping is updated, lazily, by census_log_start_write(). + + Locking in block struct: + + Exclusive g_log.lock must be held before calling any functions operatong on + block structs except census_log_start_write() and + census_log_end_write(). + + Writes to a block are serialized via writer_lock. + census_log_start_write() acquires this lock and + census_log_end_write() releases it. On failure to acquire the lock, + writer allocates a new block for the current core and updates + core_local_block accordingly. + + Simultaneous read and write access is allowed. Reader can safely read up to + committed bytes (bytes_committed). + + reader_lock protects the block, currently being read, from getting recycled. + start_read() acquires reader_lock and end_read() releases the lock. + + Read/write access to a block is disabled via try_disable_access(). It returns + with both writer_lock and reader_lock held. These locks are subsequently + released by enable_access() to enable access to the block. + + A note on naming: Most function/struct names are prepended by cl_ + (shorthand for census_log). Further, functions that manipulate structures + include the name of the structure, which will be passed as the first + argument. E.g. cl_block_initialize() will initialize a cl_block. +*/ +#include "src/core/ext/census/census_log.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/cpu.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> +#include <string.h> + +/* End of platform specific code */ + +typedef struct census_log_block_list_struct { + struct census_log_block_list_struct *next; + struct census_log_block_list_struct *prev; + struct census_log_block *block; +} cl_block_list_struct; + +typedef struct census_log_block { + /* Pointer to underlying buffer */ + char *buffer; + gpr_atm writer_lock; + gpr_atm reader_lock; + /* Keeps completely written bytes. Declared atomic because accessed + simultaneously by reader and writer. */ + gpr_atm bytes_committed; + /* Bytes already read */ + int32_t bytes_read; + /* Links for list */ + cl_block_list_struct link; +/* We want this structure to be cacheline aligned. We assume the following + sizes for the various parts on 32/64bit systems: + type 32b size 64b size + char* 4 8 + 3x gpr_atm 12 24 + int32_t 4 8 (assumes padding) + cl_block_list_struct 12 24 + TOTAL 32 64 + + Depending on the size of our cacheline and the architecture, we + selectively add char buffering to this structure. The size is checked + via assert in census_log_initialize(). */ +#if defined(GPR_ARCH_64) +#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64) +#else +#if defined(GPR_ARCH_32) +#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 32) +#else +#error "Unknown architecture" +#endif +#endif +#if CL_BLOCK_PAD_SIZE > 0 + char padding[CL_BLOCK_PAD_SIZE]; +#endif +} cl_block; + +/* A list of cl_blocks, doubly-linked through cl_block::link. */ +typedef struct census_log_block_list { + int32_t count; /* Number of items in list. */ + cl_block_list_struct ht; /* head/tail of linked list. */ +} cl_block_list; + +/* Cacheline aligned block pointers to avoid false sharing. Block pointer must + be initialized via set_block(), before calling other functions */ +typedef struct census_log_core_local_block { + gpr_atm block; +/* Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8 */ +#if defined(GPR_ARCH_64) +#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8) +#else +#if defined(GPR_ARCH_32) +#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 4) +#else +#error "Unknown architecture" +#endif +#endif +#if CL_CORE_LOCAL_BLOCK_PAD_SIZE > 0 + char padding[CL_CORE_LOCAL_BLOCK_PAD_SIZE]; +#endif +} cl_core_local_block; + +struct census_log { + int discard_old_records; + /* Number of cores (aka hardware-contexts) */ + unsigned num_cores; + /* number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log */ + int32_t num_blocks; + cl_block *blocks; /* Block metadata. */ + cl_core_local_block *core_local_blocks; /* Keeps core to block mappings. */ + gpr_mu lock; + int initialized; /* has log been initialized? */ + /* Keeps the state of the reader iterator. A value of 0 indicates that + iterator has reached the end. census_log_init_reader() resets the + value to num_core to restart iteration. */ + uint32_t read_iterator_state; + /* Points to the block being read. If non-NULL, the block is locked for + reading (block_being_read_->reader_lock is held). */ + cl_block *block_being_read; + /* A non-zero value indicates that log is full. */ + gpr_atm is_full; + char *buffer; + cl_block_list free_block_list; + cl_block_list dirty_block_list; + gpr_atm out_of_space_count; +}; + +/* Single internal log */ +static struct census_log g_log; + +/* Functions that operate on an atomic memory location used as a lock */ + +/* Returns non-zero if lock is acquired */ +static int cl_try_lock(gpr_atm *lock) { return gpr_atm_acq_cas(lock, 0, 1); } + +static void cl_unlock(gpr_atm *lock) { gpr_atm_rel_store(lock, 0); } + +/* Functions that operate on cl_core_local_block's */ + +static void cl_core_local_block_set_block(cl_core_local_block *clb, + cl_block *block) { + gpr_atm_rel_store(&clb->block, (gpr_atm)block); +} + +static cl_block *cl_core_local_block_get_block(cl_core_local_block *clb) { + return (cl_block *)gpr_atm_acq_load(&clb->block); +} + +/* Functions that operate on cl_block_list_struct's */ + +static void cl_block_list_struct_initialize(cl_block_list_struct *bls, + cl_block *block) { + bls->next = bls->prev = bls; + bls->block = block; +} + +/* Functions that operate on cl_block_list's */ + +static void cl_block_list_initialize(cl_block_list *list) { + list->count = 0; + cl_block_list_struct_initialize(&list->ht, NULL); +} + +/* Returns head of *this, or NULL if empty. */ +static cl_block *cl_block_list_head(cl_block_list *list) { + return list->ht.next->block; +} + +/* Insert element *e after *pos. */ +static void cl_block_list_insert(cl_block_list *list, cl_block_list_struct *pos, + cl_block_list_struct *e) { + list->count++; + e->next = pos->next; + e->prev = pos; + e->next->prev = e; + e->prev->next = e; +} + +/* Insert block at the head of the list */ +static void cl_block_list_insert_at_head(cl_block_list *list, cl_block *block) { + cl_block_list_insert(list, &list->ht, &block->link); +} + +/* Insert block at the tail of the list */ +static void cl_block_list_insert_at_tail(cl_block_list *list, cl_block *block) { + cl_block_list_insert(list, list->ht.prev, &block->link); +} + +/* Removes block *b. Requires *b be in the list. */ +static void cl_block_list_remove(cl_block_list *list, cl_block *b) { + list->count--; + b->link.next->prev = b->link.prev; + b->link.prev->next = b->link.next; +} + +/* Functions that operate on cl_block's */ + +static void cl_block_initialize(cl_block *block, char *buffer) { + block->buffer = buffer; + gpr_atm_rel_store(&block->writer_lock, 0); + gpr_atm_rel_store(&block->reader_lock, 0); + gpr_atm_rel_store(&block->bytes_committed, 0); + block->bytes_read = 0; + cl_block_list_struct_initialize(&block->link, block); +} + +/* Guards against exposing partially written buffer to the reader. */ +static void cl_block_set_bytes_committed(cl_block *block, + int32_t bytes_committed) { + gpr_atm_rel_store(&block->bytes_committed, bytes_committed); +} + +static int32_t cl_block_get_bytes_committed(cl_block *block) { + return gpr_atm_acq_load(&block->bytes_committed); +} + +/* Tries to disable future read/write access to this block. Succeeds if: + - no in-progress write AND + - no in-progress read AND + - 'discard_data' set to true OR no unread data + On success, clears the block state and returns with writer_lock_ and + reader_lock_ held. These locks are released by a subsequent + cl_block_access_enable() call. */ +static int cl_block_try_disable_access(cl_block *block, int discard_data) { + if (!cl_try_lock(&block->writer_lock)) { + return 0; + } + if (!cl_try_lock(&block->reader_lock)) { + cl_unlock(&block->writer_lock); + return 0; + } + if (!discard_data && + (block->bytes_read != cl_block_get_bytes_committed(block))) { + cl_unlock(&block->reader_lock); + cl_unlock(&block->writer_lock); + return 0; + } + cl_block_set_bytes_committed(block, 0); + block->bytes_read = 0; + return 1; +} + +static void cl_block_enable_access(cl_block *block) { + cl_unlock(&block->reader_lock); + cl_unlock(&block->writer_lock); +} + +/* Returns with writer_lock held. */ +static void *cl_block_start_write(cl_block *block, size_t size) { + int32_t bytes_committed; + if (!cl_try_lock(&block->writer_lock)) { + return NULL; + } + bytes_committed = cl_block_get_bytes_committed(block); + if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) { + cl_unlock(&block->writer_lock); + return NULL; + } + return block->buffer + bytes_committed; +} + +/* Releases writer_lock and increments committed bytes by 'bytes_written'. + 'bytes_written' must be <= 'size' specified in the corresponding + StartWrite() call. This function is thread-safe. */ +static void cl_block_end_write(cl_block *block, size_t bytes_written) { + cl_block_set_bytes_committed( + block, cl_block_get_bytes_committed(block) + bytes_written); + cl_unlock(&block->writer_lock); +} + +/* Returns a pointer to the first unread byte in buffer. The number of bytes + available are returned in 'bytes_available'. Acquires reader lock that is + released by a subsequent cl_block_end_read() call. Returns NULL if: + - read in progress + - no data available */ +static void *cl_block_start_read(cl_block *block, size_t *bytes_available) { + void *record; + if (!cl_try_lock(&block->reader_lock)) { + return NULL; + } + /* bytes_committed may change from under us. Use bytes_available to update + bytes_read below. */ + *bytes_available = cl_block_get_bytes_committed(block) - block->bytes_read; + if (*bytes_available == 0) { + cl_unlock(&block->reader_lock); + return NULL; + } + record = block->buffer + block->bytes_read; + block->bytes_read += *bytes_available; + return record; +} + +static void cl_block_end_read(cl_block *block) { + cl_unlock(&block->reader_lock); +} + +/* Internal functions operating on g_log */ + +/* Allocates a new free block (or recycles an available dirty block if log is + configured to discard old records). Returns NULL if out-of-space. */ +static cl_block *cl_allocate_block(void) { + cl_block *block = cl_block_list_head(&g_log.free_block_list); + if (block != NULL) { + cl_block_list_remove(&g_log.free_block_list, block); + return block; + } + if (!g_log.discard_old_records) { + /* No free block and log is configured to keep old records. */ + return NULL; + } + /* Recycle dirty block. Start from the oldest. */ + for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL; + block = block->link.next->block) { + if (cl_block_try_disable_access(block, 1 /* discard data */)) { + cl_block_list_remove(&g_log.dirty_block_list, block); + return block; + } + } + return NULL; +} + +/* Allocates a new block and updates core id => block mapping. 'old_block' + points to the block that the caller thinks is attached to + 'core_id'. 'old_block' may be NULL. Returns non-zero if: + - allocated a new block OR + - 'core_id' => 'old_block' mapping changed (another thread allocated a + block before lock was acquired). */ +static int cl_allocate_core_local_block(int32_t core_id, cl_block *old_block) { + /* Now that we have the lock, check if core-local mapping has changed. */ + cl_core_local_block *core_local_block = &g_log.core_local_blocks[core_id]; + cl_block *block = cl_core_local_block_get_block(core_local_block); + if ((block != NULL) && (block != old_block)) { + return 1; + } + if (block != NULL) { + cl_core_local_block_set_block(core_local_block, NULL); + cl_block_list_insert_at_tail(&g_log.dirty_block_list, block); + } + block = cl_allocate_block(); + if (block == NULL) { + gpr_atm_rel_store(&g_log.is_full, 1); + return 0; + } + cl_core_local_block_set_block(core_local_block, block); + cl_block_enable_access(block); + return 1; +} + +static cl_block *cl_get_block(void *record) { + uintptr_t p = (uintptr_t)((char *)record - g_log.buffer); + uintptr_t index = p >> CENSUS_LOG_2_MAX_RECORD_SIZE; + return &g_log.blocks[index]; +} + +/* Gets the next block to read and tries to free 'prev' block (if not NULL). + Returns NULL if reached the end. */ +static cl_block *cl_next_block_to_read(cl_block *prev) { + cl_block *block = NULL; + if (g_log.read_iterator_state == g_log.num_cores) { + /* We are traversing dirty list; find the next dirty block. */ + if (prev != NULL) { + /* Try to free the previous block if there is no unread data. This block + may have unread data if previously incomplete record completed between + read_next() calls. */ + block = prev->link.next->block; + if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) { + cl_block_list_remove(&g_log.dirty_block_list, prev); + cl_block_list_insert_at_head(&g_log.free_block_list, prev); + gpr_atm_rel_store(&g_log.is_full, 0); + } + } else { + block = cl_block_list_head(&g_log.dirty_block_list); + } + if (block != NULL) { + return block; + } + /* We are done with the dirty list; moving on to core-local blocks. */ + } + while (g_log.read_iterator_state > 0) { + g_log.read_iterator_state--; + block = cl_core_local_block_get_block( + &g_log.core_local_blocks[g_log.read_iterator_state]); + if (block != NULL) { + return block; + } + } + return NULL; +} + +/* External functions: primary stats_log interface */ +void census_log_initialize(size_t size_in_mb, int discard_old_records) { + int32_t ix; + /* Check cacheline alignment. */ + GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0); + GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0); + GPR_ASSERT(!g_log.initialized); + g_log.discard_old_records = discard_old_records; + g_log.num_cores = gpr_cpu_num_cores(); + /* Ensure at least as many blocks as there are cores. */ + g_log.num_blocks = GPR_MAX( + g_log.num_cores, (size_in_mb << 20) >> CENSUS_LOG_2_MAX_RECORD_SIZE); + gpr_mu_init(&g_log.lock); + g_log.read_iterator_state = 0; + g_log.block_being_read = NULL; + gpr_atm_rel_store(&g_log.is_full, 0); + g_log.core_local_blocks = (cl_core_local_block *)gpr_malloc_aligned( + g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE_LOG); + memset(g_log.core_local_blocks, 0, + g_log.num_cores * sizeof(cl_core_local_block)); + g_log.blocks = (cl_block *)gpr_malloc_aligned( + g_log.num_blocks * sizeof(cl_block), GPR_CACHELINE_SIZE_LOG); + memset(g_log.blocks, 0, g_log.num_blocks * sizeof(cl_block)); + g_log.buffer = gpr_malloc(g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE); + memset(g_log.buffer, 0, g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE); + cl_block_list_initialize(&g_log.free_block_list); + cl_block_list_initialize(&g_log.dirty_block_list); + for (ix = 0; ix < g_log.num_blocks; ++ix) { + cl_block *block = g_log.blocks + ix; + cl_block_initialize(block, + g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * ix)); + cl_block_try_disable_access(block, 1 /* discard data */); + cl_block_list_insert_at_tail(&g_log.free_block_list, block); + } + gpr_atm_rel_store(&g_log.out_of_space_count, 0); + g_log.initialized = 1; +} + +void census_log_shutdown(void) { + GPR_ASSERT(g_log.initialized); + gpr_mu_destroy(&g_log.lock); + gpr_free_aligned(g_log.core_local_blocks); + g_log.core_local_blocks = NULL; + gpr_free_aligned(g_log.blocks); + g_log.blocks = NULL; + gpr_free(g_log.buffer); + g_log.buffer = NULL; + g_log.initialized = 0; +} + +void *census_log_start_write(size_t size) { + /* Used to bound number of times block allocation is attempted. */ + int32_t attempts_remaining = g_log.num_blocks; + /* TODO(aveitch): move this inside the do loop when current_cpu is fixed */ + int32_t core_id = gpr_cpu_current_cpu(); + GPR_ASSERT(g_log.initialized); + if (size > CENSUS_LOG_MAX_RECORD_SIZE) { + return NULL; + } + do { + int allocated; + void *record = NULL; + cl_block *block = + cl_core_local_block_get_block(&g_log.core_local_blocks[core_id]); + if (block && (record = cl_block_start_write(block, size))) { + return record; + } + /* Need to allocate a new block. We are here if: + - No block associated with the core OR + - Write in-progress on the block OR + - block is out of space */ + if (gpr_atm_acq_load(&g_log.is_full)) { + gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); + return NULL; + } + gpr_mu_lock(&g_log.lock); + allocated = cl_allocate_core_local_block(core_id, block); + gpr_mu_unlock(&g_log.lock); + if (!allocated) { + gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); + return NULL; + } + } while (attempts_remaining--); + /* Give up. */ + gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); + return NULL; +} + +void census_log_end_write(void *record, size_t bytes_written) { + GPR_ASSERT(g_log.initialized); + cl_block_end_write(cl_get_block(record), bytes_written); +} + +void census_log_init_reader(void) { + GPR_ASSERT(g_log.initialized); + gpr_mu_lock(&g_log.lock); + /* If a block is locked for reading unlock it. */ + if (g_log.block_being_read != NULL) { + cl_block_end_read(g_log.block_being_read); + g_log.block_being_read = NULL; + } + g_log.read_iterator_state = g_log.num_cores; + gpr_mu_unlock(&g_log.lock); +} + +const void *census_log_read_next(size_t *bytes_available) { + GPR_ASSERT(g_log.initialized); + gpr_mu_lock(&g_log.lock); + if (g_log.block_being_read != NULL) { + cl_block_end_read(g_log.block_being_read); + } + do { + g_log.block_being_read = cl_next_block_to_read(g_log.block_being_read); + if (g_log.block_being_read != NULL) { + void *record = + cl_block_start_read(g_log.block_being_read, bytes_available); + if (record != NULL) { + gpr_mu_unlock(&g_log.lock); + return record; + } + } + } while (g_log.block_being_read != NULL); + gpr_mu_unlock(&g_log.lock); + return NULL; +} + +size_t census_log_remaining_space(void) { + size_t space; + GPR_ASSERT(g_log.initialized); + gpr_mu_lock(&g_log.lock); + if (g_log.discard_old_records) { + /* Remaining space is not meaningful; just return the entire log space. */ + space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE; + } else { + space = g_log.free_block_list.count * CENSUS_LOG_MAX_RECORD_SIZE; + } + gpr_mu_unlock(&g_log.lock); + return space; +} + +int census_log_out_of_space_count(void) { + GPR_ASSERT(g_log.initialized); + return gpr_atm_acq_load(&g_log.out_of_space_count); +} diff --git a/src/core/ext/census/census_log.h b/src/core/ext/census/census_log.h new file mode 100644 index 0000000000..534ecc5705 --- /dev/null +++ b/src/core/ext/census/census_log.h @@ -0,0 +1,91 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_CENSUS_LOG_H +#define GRPC_CORE_EXT_CENSUS_CENSUS_LOG_H + +#include <stddef.h> + +/* Maximum record size, in bytes. */ +#define CENSUS_LOG_2_MAX_RECORD_SIZE 14 /* 2^14 = 16KB */ +#define CENSUS_LOG_MAX_RECORD_SIZE (1 << CENSUS_LOG_2_MAX_RECORD_SIZE) + +/* Initialize the statistics logging subsystem with the given log size. A log + size of 0 will result in the smallest possible log for the platform + (approximately CENSUS_LOG_MAX_RECORD_SIZE * gpr_cpu_num_cores()). If + discard_old_records is non-zero, then new records will displace older ones + when the log is full. This function must be called before any other + census_log functions. +*/ +void census_log_initialize(size_t size_in_mb, int discard_old_records); + +/* Shutdown the logging subsystem. Caller must ensure that: + - no in progress or future call to any census_log functions + - no incomplete records +*/ +void census_log_shutdown(void); + +/* Allocates and returns a 'size' bytes record and marks it in use. A + subsequent census_log_end_write() marks the record complete. The + 'bytes_written' census_log_end_write() argument must be <= + 'size'. Returns NULL if out-of-space AND: + - log is configured to keep old records OR + - all blocks are pinned by incomplete records. +*/ +void *census_log_start_write(size_t size); + +void census_log_end_write(void *record, size_t bytes_written); + +/* census_log_read_next() iterates over blocks with data and for each block + returns a pointer to the first unread byte. The number of bytes that can be + read are returned in 'bytes_available'. Reader is expected to read all + available data. Reading the data consumes it i.e. it cannot be read again. + census_log_read_next() returns NULL if the end is reached i.e last block + is read. census_log_init_reader() starts the iteration or aborts the + current iteration. +*/ +void census_log_init_reader(void); +const void *census_log_read_next(size_t *bytes_available); + +/* Returns estimated remaining space across all blocks, in bytes. If log is + configured to discard old records, returns total log space. Otherwise, + returns space available in empty blocks (partially filled blocks are + treated as full). +*/ +size_t census_log_remaining_space(void); + +/* Returns the number of times gprc_stats_log_start_write() failed due to + out-of-space. */ +int census_log_out_of_space_count(void); + +#endif /* GRPC_CORE_EXT_CENSUS_CENSUS_LOG_H */ diff --git a/src/core/ext/census/census_rpc_stats.c b/src/core/ext/census/census_rpc_stats.c new file mode 100644 index 0000000000..09ee12d54b --- /dev/null +++ b/src/core/ext/census/census_rpc_stats.c @@ -0,0 +1,253 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include "src/core/ext/census/census_interface.h" +#include "src/core/ext/census/census_rpc_stats.h" +#include "src/core/ext/census/census_tracing.h" +#include "src/core/ext/census/hash_table.h" +#include "src/core/ext/census/window_stats.h" +#include "src/core/lib/support/murmur_hash.h" +#include "src/core/lib/support/string.h" + +#define NUM_INTERVALS 3 +#define MINUTE_INTERVAL 0 +#define HOUR_INTERVAL 1 +#define TOTAL_INTERVAL 2 + +/* for easier typing */ +typedef census_per_method_rpc_stats per_method_stats; + +/* Ensure mu is only initialized once. */ +static gpr_once g_stats_store_mu_init = GPR_ONCE_INIT; +/* Guards two stats stores. */ +static gpr_mu g_mu; +static census_ht *g_client_stats_store = NULL; +static census_ht *g_server_stats_store = NULL; + +static void init_mutex(void) { gpr_mu_init(&g_mu); } + +static void init_mutex_once(void) { + gpr_once_init(&g_stats_store_mu_init, init_mutex); +} + +static int cmp_str_keys(const void *k1, const void *k2) { + return strcmp((const char *)k1, (const char *)k2); +} + +/* TODO(hongyu): replace it with cityhash64 */ +static uint64_t simple_hash(const void *k) { + size_t len = strlen(k); + uint64_t higher = gpr_murmur_hash3((const char *)k, len / 2, 0); + return higher << 32 | + gpr_murmur_hash3((const char *)k + len / 2, len - len / 2, 0); +} + +static void delete_stats(void *stats) { + census_window_stats_destroy((struct census_window_stats *)stats); +} + +static void delete_key(void *key) { gpr_free(key); } + +static const census_ht_option ht_opt = { + CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */, + simple_hash /* hash function */, cmp_str_keys /* key comparator */, + delete_stats /* data deleter */, delete_key /* key deleter */ +}; + +static void init_rpc_stats(void *stats) { + memset(stats, 0, sizeof(census_rpc_stats)); +} + +static void stat_add_proportion(double p, void *base, const void *addme) { + census_rpc_stats *b = (census_rpc_stats *)base; + census_rpc_stats *a = (census_rpc_stats *)addme; + b->cnt += p * a->cnt; + b->rpc_error_cnt += p * a->rpc_error_cnt; + b->app_error_cnt += p * a->app_error_cnt; + b->elapsed_time_ms += p * a->elapsed_time_ms; + b->api_request_bytes += p * a->api_request_bytes; + b->wire_request_bytes += p * a->wire_request_bytes; + b->api_response_bytes += p * a->api_response_bytes; + b->wire_response_bytes += p * a->wire_response_bytes; +} + +static void stat_add(void *base, const void *addme) { + stat_add_proportion(1.0, base, addme); +} + +static gpr_timespec min_hour_total_intervals[3] = { + {60, 0}, {3600, 0}, {36000000, 0}}; + +static const census_window_stats_stat_info window_stats_settings = { + sizeof(census_rpc_stats), init_rpc_stats, stat_add, stat_add_proportion}; + +census_rpc_stats *census_rpc_stats_create_empty(void) { + census_rpc_stats *ret = + (census_rpc_stats *)gpr_malloc(sizeof(census_rpc_stats)); + memset(ret, 0, sizeof(census_rpc_stats)); + return ret; +} + +void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats *data) { + int i = 0; + for (i = 0; i < data->num_entries; i++) { + if (data->stats[i].method != NULL) { + gpr_free((void *)data->stats[i].method); + } + } + if (data->stats != NULL) { + gpr_free(data->stats); + } + data->num_entries = 0; + data->stats = NULL; +} + +static void record_stats(census_ht *store, census_op_id op_id, + const census_rpc_stats *stats) { + gpr_mu_lock(&g_mu); + if (store != NULL) { + census_trace_obj *trace = NULL; + census_internal_lock_trace_store(); + trace = census_get_trace_obj_locked(op_id); + if (trace != NULL) { + const char *method_name = census_get_trace_method_name(trace); + struct census_window_stats *window_stats = NULL; + census_ht_key key; + key.ptr = (void *)method_name; + window_stats = census_ht_find(store, key); + census_internal_unlock_trace_store(); + if (window_stats == NULL) { + window_stats = census_window_stats_create(3, min_hour_total_intervals, + 30, &window_stats_settings); + key.ptr = gpr_strdup(key.ptr); + census_ht_insert(store, key, (void *)window_stats); + } + census_window_stats_add(window_stats, gpr_now(GPR_CLOCK_REALTIME), stats); + } else { + census_internal_unlock_trace_store(); + } + } + gpr_mu_unlock(&g_mu); +} + +void census_record_rpc_client_stats(census_op_id op_id, + const census_rpc_stats *stats) { + record_stats(g_client_stats_store, op_id, stats); +} + +void census_record_rpc_server_stats(census_op_id op_id, + const census_rpc_stats *stats) { + record_stats(g_server_stats_store, op_id, stats); +} + +/* Get stats from input stats store */ +static void get_stats(census_ht *store, census_aggregated_rpc_stats *data) { + GPR_ASSERT(data != NULL); + if (data->num_entries != 0) { + census_aggregated_rpc_stats_set_empty(data); + } + gpr_mu_lock(&g_mu); + if (store != NULL) { + size_t n; + unsigned i, j; + gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); + census_ht_kv *kv = census_ht_get_all_elements(store, &n); + if (kv != NULL) { + data->num_entries = n; + data->stats = + (per_method_stats *)gpr_malloc(sizeof(per_method_stats) * n); + for (i = 0; i < n; i++) { + census_window_stats_sums sums[NUM_INTERVALS]; + for (j = 0; j < NUM_INTERVALS; j++) { + sums[j].statistic = (void *)census_rpc_stats_create_empty(); + } + data->stats[i].method = gpr_strdup(kv[i].k.ptr); + census_window_stats_get_sums(kv[i].v, now, sums); + data->stats[i].minute_stats = + *(census_rpc_stats *)sums[MINUTE_INTERVAL].statistic; + data->stats[i].hour_stats = + *(census_rpc_stats *)sums[HOUR_INTERVAL].statistic; + data->stats[i].total_stats = + *(census_rpc_stats *)sums[TOTAL_INTERVAL].statistic; + for (j = 0; j < NUM_INTERVALS; j++) { + gpr_free(sums[j].statistic); + } + } + gpr_free(kv); + } + } + gpr_mu_unlock(&g_mu); +} + +void census_get_client_stats(census_aggregated_rpc_stats *data) { + get_stats(g_client_stats_store, data); +} + +void census_get_server_stats(census_aggregated_rpc_stats *data) { + get_stats(g_server_stats_store, data); +} + +void census_stats_store_init(void) { + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_client_stats_store == NULL && g_server_stats_store == NULL) { + g_client_stats_store = census_ht_create(&ht_opt); + g_server_stats_store = census_ht_create(&ht_opt); + } else { + gpr_log(GPR_ERROR, "Census stats store already initialized."); + } + gpr_mu_unlock(&g_mu); +} + +void census_stats_store_shutdown(void) { + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_client_stats_store != NULL) { + census_ht_destroy(g_client_stats_store); + g_client_stats_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census server stats store not initialized."); + } + if (g_server_stats_store != NULL) { + census_ht_destroy(g_server_stats_store); + g_server_stats_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census client stats store not initialized."); + } + gpr_mu_unlock(&g_mu); +} diff --git a/src/core/ext/census/census_rpc_stats.h b/src/core/ext/census/census_rpc_stats.h new file mode 100644 index 0000000000..7e4d8d1640 --- /dev/null +++ b/src/core/ext/census/census_rpc_stats.h @@ -0,0 +1,101 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_CENSUS_RPC_STATS_H +#define GRPC_CORE_EXT_CENSUS_CENSUS_RPC_STATS_H + +#include <grpc/support/port_platform.h> +#include "src/core/ext/census/census_interface.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct census_rpc_stats { + uint64_t cnt; + uint64_t rpc_error_cnt; + uint64_t app_error_cnt; + double elapsed_time_ms; + double api_request_bytes; + double wire_request_bytes; + double api_response_bytes; + double wire_response_bytes; +}; + +/* Creates an empty rpc stats object on heap. */ +census_rpc_stats *census_rpc_stats_create_empty(void); + +typedef struct census_per_method_rpc_stats { + const char *method; + census_rpc_stats minute_stats; /* cumulative stats in the past minute */ + census_rpc_stats hour_stats; /* cumulative stats in the past hour */ + census_rpc_stats total_stats; /* cumulative stats from last gc */ +} census_per_method_rpc_stats; + +typedef struct census_aggregated_rpc_stats { + int num_entries; + census_per_method_rpc_stats *stats; +} census_aggregated_rpc_stats; + +/* Initializes an aggregated rpc stats object to an empty state. */ +void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats *data); + +/* Records client side stats of a rpc. */ +void census_record_rpc_client_stats(census_op_id op_id, + const census_rpc_stats *stats); + +/* Records server side stats of a rpc. */ +void census_record_rpc_server_stats(census_op_id op_id, + const census_rpc_stats *stats); + +/* The following two functions are intended for inprocess query of + per-service per-method stats from grpc implementations. */ + +/* Populates *data_map with server side aggregated per-service per-method + stats. + DO NOT CALL from outside of grpc code. */ +void census_get_server_stats(census_aggregated_rpc_stats *data_map); + +/* Populates *data_map with client side aggregated per-service per-method + stats. + DO NOT CALL from outside of grpc code. */ +void census_get_client_stats(census_aggregated_rpc_stats *data_map); + +void census_stats_store_init(void); +void census_stats_store_shutdown(void); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_EXT_CENSUS_CENSUS_RPC_STATS_H */ diff --git a/src/core/ext/census/census_tracing.c b/src/core/ext/census/census_tracing.c new file mode 100644 index 0000000000..f893dc9864 --- /dev/null +++ b/src/core/ext/census/census_tracing.c @@ -0,0 +1,241 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/census/census_tracing.h" +#include "src/core/ext/census/census_interface.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> +#include "src/core/ext/census/hash_table.h" +#include "src/core/lib/support/string.h" + +void census_trace_obj_destroy(census_trace_obj *obj) { + census_trace_annotation *p = obj->annotations; + while (p != NULL) { + census_trace_annotation *next = p->next; + gpr_free(p); + p = next; + } + gpr_free(obj->method); + gpr_free(obj); +} + +static void delete_trace_obj(void *obj) { + census_trace_obj_destroy((census_trace_obj *)obj); +} + +static const census_ht_option ht_opt = { + CENSUS_HT_UINT64 /* key type */, + 571 /* n_of_buckets */, + NULL /* hash */, + NULL /* compare_keys */, + delete_trace_obj /* delete data */, + NULL /* delete key */ +}; + +static gpr_once g_init_mutex_once = GPR_ONCE_INIT; +static gpr_mu g_mu; /* Guards following two static variables. */ +static census_ht *g_trace_store = NULL; +static uint64_t g_id = 0; + +static census_ht_key op_id_as_key(census_op_id *id) { + return *(census_ht_key *)id; +} + +static uint64_t op_id_2_uint64(census_op_id *id) { + uint64_t ret; + memcpy(&ret, id, sizeof(census_op_id)); + return ret; +} + +static void init_mutex(void) { gpr_mu_init(&g_mu); } + +static void init_mutex_once(void) { + gpr_once_init(&g_init_mutex_once, init_mutex); +} + +census_op_id census_tracing_start_op(void) { + gpr_mu_lock(&g_mu); + { + census_trace_obj *ret = gpr_malloc(sizeof(census_trace_obj)); + memset(ret, 0, sizeof(census_trace_obj)); + g_id++; + memcpy(&ret->id, &g_id, sizeof(census_op_id)); + ret->rpc_stats.cnt = 1; + ret->ts = gpr_now(GPR_CLOCK_REALTIME); + census_ht_insert(g_trace_store, op_id_as_key(&ret->id), (void *)ret); + gpr_log(GPR_DEBUG, "Start tracing for id %lu", g_id); + gpr_mu_unlock(&g_mu); + return ret->id; + } +} + +int census_add_method_tag(census_op_id op_id, const char *method) { + int ret = 0; + census_trace_obj *trace = NULL; + gpr_mu_lock(&g_mu); + trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); + if (trace == NULL) { + ret = 1; + } else { + trace->method = gpr_strdup(method); + } + gpr_mu_unlock(&g_mu); + return ret; +} + +void census_tracing_print(census_op_id op_id, const char *anno_txt) { + census_trace_obj *trace = NULL; + gpr_mu_lock(&g_mu); + trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); + if (trace != NULL) { + census_trace_annotation *anno = gpr_malloc(sizeof(census_trace_annotation)); + anno->ts = gpr_now(GPR_CLOCK_REALTIME); + { + char *d = anno->txt; + const char *s = anno_txt; + int n = 0; + for (; n < CENSUS_MAX_ANNOTATION_LENGTH && *s != '\0'; ++n) { + *d++ = *s++; + } + *d = '\0'; + } + anno->next = trace->annotations; + trace->annotations = anno; + } + gpr_mu_unlock(&g_mu); +} + +void census_tracing_end_op(census_op_id op_id) { + census_trace_obj *trace = NULL; + gpr_mu_lock(&g_mu); + trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); + if (trace != NULL) { + trace->rpc_stats.elapsed_time_ms = gpr_timespec_to_micros( + gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), trace->ts)); + gpr_log(GPR_DEBUG, "End tracing for id %lu, method %s, latency %f us", + op_id_2_uint64(&op_id), trace->method, + trace->rpc_stats.elapsed_time_ms); + census_ht_erase(g_trace_store, op_id_as_key(&op_id)); + } + gpr_mu_unlock(&g_mu); +} + +void census_tracing_init(void) { + init_mutex_once(); + gpr_mu_lock(&g_mu); + if (g_trace_store == NULL) { + g_id = 1; + g_trace_store = census_ht_create(&ht_opt); + } else { + gpr_log(GPR_ERROR, "Census trace store already initialized."); + } + gpr_mu_unlock(&g_mu); +} + +void census_tracing_shutdown(void) { + gpr_mu_lock(&g_mu); + if (g_trace_store != NULL) { + census_ht_destroy(g_trace_store); + g_trace_store = NULL; + } else { + gpr_log(GPR_ERROR, "Census trace store is not initialized."); + } + gpr_mu_unlock(&g_mu); +} + +void census_internal_lock_trace_store(void) { gpr_mu_lock(&g_mu); } + +void census_internal_unlock_trace_store(void) { gpr_mu_unlock(&g_mu); } + +census_trace_obj *census_get_trace_obj_locked(census_op_id op_id) { + if (g_trace_store == NULL) { + gpr_log(GPR_ERROR, "Census trace store is not initialized."); + return NULL; + } + return (census_trace_obj *)census_ht_find(g_trace_store, + op_id_as_key(&op_id)); +} + +const char *census_get_trace_method_name(const census_trace_obj *trace) { + return trace->method; +} + +static census_trace_annotation *dup_annotation_chain( + census_trace_annotation *from) { + census_trace_annotation *ret = NULL; + census_trace_annotation **to = &ret; + for (; from != NULL; from = from->next) { + *to = gpr_malloc(sizeof(census_trace_annotation)); + memcpy(*to, from, sizeof(census_trace_annotation)); + to = &(*to)->next; + } + return ret; +} + +static census_trace_obj *trace_obj_dup(census_trace_obj *from) { + census_trace_obj *to = NULL; + GPR_ASSERT(from != NULL); + to = gpr_malloc(sizeof(census_trace_obj)); + to->id = from->id; + to->ts = from->ts; + to->rpc_stats = from->rpc_stats; + to->method = gpr_strdup(from->method); + to->annotations = dup_annotation_chain(from->annotations); + return to; +} + +census_trace_obj **census_get_active_ops(int *num_active_ops) { + census_trace_obj **ret = NULL; + gpr_mu_lock(&g_mu); + if (g_trace_store != NULL) { + size_t n = 0; + census_ht_kv *all_kvs = census_ht_get_all_elements(g_trace_store, &n); + *num_active_ops = (int)n; + if (n != 0) { + size_t i = 0; + ret = gpr_malloc(sizeof(census_trace_obj *) * n); + for (i = 0; i < n; i++) { + ret[i] = trace_obj_dup((census_trace_obj *)all_kvs[i].v); + } + } + gpr_free(all_kvs); + } + gpr_mu_unlock(&g_mu); + return ret; +} diff --git a/src/core/ext/census/census_tracing.h b/src/core/ext/census/census_tracing.h new file mode 100644 index 0000000000..42a0d7403e --- /dev/null +++ b/src/core/ext/census/census_tracing.h @@ -0,0 +1,96 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_CENSUS_TRACING_H +#define GRPC_CORE_EXT_CENSUS_CENSUS_TRACING_H + +#include <grpc/support/time.h> +#include "src/core/ext/census/census_rpc_stats.h" + +/* WARNING: The data structures and APIs provided by this file are for GRPC + library's internal use ONLY. They might be changed in backward-incompatible + ways and are not subject to any deprecation policy. + They are not recommended for external use. + */ +#ifdef __cplusplus +extern "C" { +#endif + +/* Struct for a trace annotation. */ +typedef struct census_trace_annotation { + gpr_timespec ts; /* timestamp of the annotation */ + char txt[CENSUS_MAX_ANNOTATION_LENGTH + 1]; /* actual txt annotation */ + struct census_trace_annotation *next; +} census_trace_annotation; + +typedef struct census_trace_obj { + census_op_id id; + gpr_timespec ts; + census_rpc_stats rpc_stats; + char *method; + census_trace_annotation *annotations; +} census_trace_obj; + +/* Deletes trace object. */ +void census_trace_obj_destroy(census_trace_obj *obj); + +/* Initializes trace store. This function is thread safe. */ +void census_tracing_init(void); + +/* Shutsdown trace store. This function is thread safe. */ +void census_tracing_shutdown(void); + +/* Gets trace obj corresponding to the input op_id. Returns NULL if trace store + is not initialized or trace obj is not found. Requires trace store being + locked before calling this function. */ +census_trace_obj *census_get_trace_obj_locked(census_op_id op_id); + +/* The following two functions acquire and release the trace store global lock. + They are for census internal use only. */ +void census_internal_lock_trace_store(void); +void census_internal_unlock_trace_store(void); + +/* Gets method name associated with the input trace object. */ +const char *census_get_trace_method_name(const census_trace_obj *trace); + +/* Returns an array of pointers to trace objects of currently active operations + and fills in number of active operations. Returns NULL if there are no active + operations. + Caller owns the returned objects. */ +census_trace_obj **census_get_active_ops(int *num_active_ops); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_EXT_CENSUS_CENSUS_TRACING_H */ diff --git a/src/core/ext/census/context.c b/src/core/ext/census/context.c new file mode 100644 index 0000000000..0dfc4ecbf1 --- /dev/null +++ b/src/core/ext/census/context.c @@ -0,0 +1,509 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/useful.h> +#include <stdbool.h> +#include <string.h> +#include "src/core/lib/support/string.h" + +// Functions in this file support the public context API, including +// encoding/decoding as part of context propagation across RPC's. The overall +// requirements (in approximate priority order) for the +// context representation: +// 1. Efficient conversion to/from wire format +// 2. Minimal bytes used on-wire +// 3. Efficient context creation +// 4. Efficient lookup of tag value for a key +// 5. Efficient iteration over tags +// 6. Minimal memory footprint +// +// Notes on tradeoffs/decisions: +// * tag includes 1 byte length of key, as well as nil-terminating byte. These +// are to aid in efficient parsing and the ability to directly return key +// strings. This is more important than saving a single byte/tag on the wire. +// * The wire encoding uses only single byte values. This eliminates the need +// to handle endian-ness conversions. It also means there is a hard upper +// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS. +// * Keep all tag information (keys/values/flags) in a single memory buffer, +// that can be directly copied to the wire. + +// min and max valid chars in tag keys and values. All printable ASCII is OK. +#define MIN_VALID_TAG_CHAR 32 // ' ' +#define MAX_VALID_TAG_CHAR 126 // '~' + +// Structure representing a set of tags. Essentially a count of number of tags +// present, and pointer to a chunk of memory that contains the per-tag details. +struct tag_set { + int ntags; // number of tags. + int ntags_alloc; // ntags + number of deleted tags (total number of tags + // in all of kvm). This will always be == ntags, except during the process + // of building a new tag set. + size_t kvm_size; // number of bytes allocated for key/value storage. + size_t kvm_used; // number of bytes of used key/value memory + char *kvm; // key/value memory. Consists of repeated entries of: + // Offset Size Description + // 0 1 Key length, including trailing 0. (K) + // 1 1 Value length, including trailing 0 (V) + // 2 1 Flags + // 3 K Key bytes + // 3 + K V Value bytes + // + // We refer to the first 3 entries as the 'tag header'. If extra values are + // introduced in the header, you will need to modify the TAG_HEADER_SIZE + // constant, the raw_tag structure (and everything that uses it) and the + // encode/decode functions appropriately. +}; + +// Number of bytes in tag header. +#define TAG_HEADER_SIZE 3 // key length (1) + value length (1) + flags (1) +// Offsets to tag header entries. +#define KEY_LEN_OFFSET 0 +#define VALUE_LEN_OFFSET 1 +#define FLAG_OFFSET 2 + +// raw_tag represents the raw-storage form of a tag in the kvm of a tag_set. +struct raw_tag { + uint8_t key_len; + uint8_t value_len; + uint8_t flags; + char *key; + char *value; +}; + +// Use a reserved flag bit for indication of deleted tag. +#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED +#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED) + +// Primary representation of a context. Composed of 2 underlying tag_set +// structs, one each for propagated and local (non-propagated) tags. This is +// to efficiently support tag encoding/decoding. +// TODO(aveitch): need to add tracing id's/structure. +struct census_context { + struct tag_set tags[2]; + census_context_status status; +}; + +// Indices into the tags member of census_context +#define PROPAGATED_TAGS 0 +#define LOCAL_TAGS 1 + +// Validate (check all characters are in range and size is less than limit) a +// key or value string. Returns 0 if the string is invalid, or the length +// (including terminator) if valid. +static size_t validate_tag(const char *kv) { + size_t len = 1; + char ch; + while ((ch = *kv++) != 0) { + if (ch < MIN_VALID_TAG_CHAR || ch > MAX_VALID_TAG_CHAR) { + return 0; + } + len++; + } + if (len > CENSUS_MAX_TAG_KV_LEN) { + return 0; + } + return len; +} + +// Extract a raw tag given a pointer (raw) to the tag header. Allow for some +// extra bytes in the tag header (see encode/decode functions for usage: this +// allows for future expansion of the tag header). +static char *decode_tag(struct raw_tag *tag, char *header, int offset) { + tag->key_len = (uint8_t)(*header++); + tag->value_len = (uint8_t)(*header++); + tag->flags = (uint8_t)(*header++); + header += offset; + tag->key = header; + header += tag->key_len; + tag->value = header; + return header + tag->value_len; +} + +// Make a copy (in 'to') of an existing tag_set. +static void tag_set_copy(struct tag_set *to, const struct tag_set *from) { + memcpy(to, from, sizeof(struct tag_set)); + to->kvm = gpr_malloc(to->kvm_size); + memcpy(to->kvm, from->kvm, from->kvm_used); +} + +// Delete a tag from a tag_set, if it exists (returns true if it did). +static bool tag_set_delete_tag(struct tag_set *tags, const char *key, + size_t key_len) { + char *kvp = tags->kvm; + for (int i = 0; i < tags->ntags_alloc; i++) { + uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET); + struct raw_tag tag; + kvp = decode_tag(&tag, kvp, 0); + if (CENSUS_TAG_IS_DELETED(tag.flags)) continue; + if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) { + *flags |= CENSUS_TAG_DELETED; + tags->ntags--; + return true; + } + } + return false; +} + +// Delete a tag from a context, return true if it existed. +static bool context_delete_tag(census_context *context, const census_tag *tag, + size_t key_len) { + return ( + tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) || + tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len)); +} + +// Add a tag to a tag_set. Return true on success, false if the tag could +// not be added because of constraints on tag set size. This function should +// not be called if the tag may already exist (in a non-deleted state) in +// the tag_set, as that would result in two tags with the same key. +static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, + size_t key_len, size_t value_len) { + if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) { + return false; + } + const size_t tag_size = key_len + value_len + TAG_HEADER_SIZE; + if (tags->kvm_used + tag_size > tags->kvm_size) { + // allocate new memory if needed + tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE; + char *new_kvm = gpr_malloc(tags->kvm_size); + memcpy(new_kvm, tags->kvm, tags->kvm_used); + gpr_free(tags->kvm); + tags->kvm = new_kvm; + } + char *kvp = tags->kvm + tags->kvm_used; + *kvp++ = (char)key_len; + *kvp++ = (char)value_len; + // ensure reserved flags are not used. + *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS)); + memcpy(kvp, tag->key, key_len); + kvp += key_len; + memcpy(kvp, tag->value, value_len); + tags->kvm_used += tag_size; + tags->ntags++; + tags->ntags_alloc++; + return true; +} + +// Add/modify/delete a tag to/in a context. Caller must validate that tag key +// etc. are valid. +static void context_modify_tag(census_context *context, const census_tag *tag, + size_t key_len, size_t value_len) { + // First delete the tag if it is already present. + bool deleted = context_delete_tag(context, tag, key_len); + bool added = false; + if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { + added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len, + value_len); + } else { + added = + tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len, value_len); + } + + if (deleted) { + context->status.n_modified_tags++; + } else { + if (added) { + context->status.n_added_tags++; + } else { + context->status.n_ignored_tags++; + } + } +} + +// Remove memory used for deleted tags from a tag set. Basic algorithm: +// 1) Walk through tag set to find first deleted tag. Record where it is. +// 2) Find the next not-deleted tag. Copy all of kvm from there to the end +// "over" the deleted tags +// 3) repeat #1 and #2 until we have seen all tags +// 4) if we are still looking for a not-deleted tag, then all the end portion +// of the kvm is deleted. Just reduce the used amount of memory by the +// appropriate amount. +static void tag_set_flatten(struct tag_set *tags) { + if (tags->ntags == tags->ntags_alloc) return; + bool found_deleted = false; // found a deleted tag. + char *kvp = tags->kvm; + char *dbase = NULL; // record location of deleted tag + for (int i = 0; i < tags->ntags_alloc; i++) { + struct raw_tag tag; + char *next_kvp = decode_tag(&tag, kvp, 0); + if (found_deleted) { + if (!CENSUS_TAG_IS_DELETED(tag.flags)) { + ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags + GPR_ASSERT(reduce > 0); + ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp; + GPR_ASSERT(copy_size > 0); + memmove(dbase, kvp, (size_t)copy_size); + tags->kvm_used -= (size_t)reduce; + next_kvp -= reduce; + found_deleted = false; + } + } else { + if (CENSUS_TAG_IS_DELETED(tag.flags)) { + dbase = kvp; + found_deleted = true; + } + } + kvp = next_kvp; + } + if (found_deleted) { + GPR_ASSERT(dbase > tags->kvm); + tags->kvm_used = (size_t)(dbase - tags->kvm); + } + tags->ntags_alloc = tags->ntags; +} + +census_context *census_context_create(const census_context *base, + const census_tag *tags, int ntags, + census_context_status const **status) { + census_context *context = gpr_malloc(sizeof(census_context)); + // If we are given a base, copy it into our new tag set. Otherwise set it + // to zero/NULL everything. + if (base == NULL) { + memset(context, 0, sizeof(census_context)); + } else { + tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]); + tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]); + memset(&context->status, 0, sizeof(context->status)); + } + // Walk over the additional tags and, for those that aren't invalid, modify + // the context to add/replace/delete as required. + for (int i = 0; i < ntags; i++) { + const census_tag *tag = &tags[i]; + size_t key_len = validate_tag(tag->key); + // ignore the tag if it is invalid or too short. + if (key_len <= 1) { + context->status.n_invalid_tags++; + } else { + if (tag->value != NULL) { + size_t value_len = validate_tag(tag->value); + if (value_len != 0) { + context_modify_tag(context, tag, key_len, value_len); + } else { + context->status.n_invalid_tags++; + } + } else { + if (context_delete_tag(context, tag, key_len)) { + context->status.n_deleted_tags++; + } + } + } + } + // Remove any deleted tags, update status if needed, and return. + tag_set_flatten(&context->tags[PROPAGATED_TAGS]); + tag_set_flatten(&context->tags[LOCAL_TAGS]); + context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; + context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags; + if (status) { + *status = &context->status; + } + return context; +} + +const census_context_status *census_context_get_status( + const census_context *context) { + return &context->status; +} + +void census_context_destroy(census_context *context) { + gpr_free(context->tags[PROPAGATED_TAGS].kvm); + gpr_free(context->tags[LOCAL_TAGS].kvm); + gpr_free(context); +} + +void census_context_initialize_iterator(const census_context *context, + census_context_iterator *iterator) { + iterator->context = context; + iterator->index = 0; + if (context->tags[PROPAGATED_TAGS].ntags != 0) { + iterator->base = PROPAGATED_TAGS; + iterator->kvm = context->tags[PROPAGATED_TAGS].kvm; + } else if (context->tags[LOCAL_TAGS].ntags != 0) { + iterator->base = LOCAL_TAGS; + iterator->kvm = context->tags[LOCAL_TAGS].kvm; + } else { + iterator->base = -1; + } +} + +int census_context_next_tag(census_context_iterator *iterator, + census_tag *tag) { + if (iterator->base < 0) { + return 0; + } + struct raw_tag raw; + iterator->kvm = decode_tag(&raw, iterator->kvm, 0); + tag->key = raw.key; + tag->value = raw.value; + tag->flags = raw.flags; + if (++iterator->index == iterator->context->tags[iterator->base].ntags) { + do { + if (iterator->base == LOCAL_TAGS) { + iterator->base = -1; + return 1; + } + } while (iterator->context->tags[++iterator->base].ntags == 0); + iterator->index = 0; + iterator->kvm = iterator->context->tags[iterator->base].kvm; + } + return 1; +} + +// Find a tag in a tag_set by key. Return true if found, false otherwise. +static bool tag_set_get_tag(const struct tag_set *tags, const char *key, + size_t key_len, census_tag *tag) { + char *kvp = tags->kvm; + for (int i = 0; i < tags->ntags; i++) { + struct raw_tag raw; + kvp = decode_tag(&raw, kvp, 0); + if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) { + tag->key = raw.key; + tag->value = raw.value; + tag->flags = raw.flags; + return true; + } + } + return false; +} + +int census_context_get_tag(const census_context *context, const char *key, + census_tag *tag) { + size_t key_len = strlen(key) + 1; + if (key_len == 1) { + return 0; + } + if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) || + tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) { + return 1; + } + return 0; +} + +// Context encoding and decoding functions. +// +// Wire format for tag_set's on the wire: +// +// First, a tag set header: +// +// offset bytes description +// 0 1 version number +// 1 1 number of bytes in this header. This allows for future +// expansion. +// 2 1 number of bytes in each tag header. +// 3 1 ntags value from tag set. +// +// This is followed by the key/value memory from struct tag_set. + +#define ENCODED_VERSION 0 // Version number +#define ENCODED_HEADER_SIZE 4 // size of tag set header + +// Encode a tag set. Returns 0 if buffer is too small. +static size_t tag_set_encode(const struct tag_set *tags, char *buffer, + size_t buf_size) { + if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) { + return 0; + } + buf_size -= ENCODED_HEADER_SIZE; + *buffer++ = (char)ENCODED_VERSION; + *buffer++ = (char)ENCODED_HEADER_SIZE; + *buffer++ = (char)TAG_HEADER_SIZE; + *buffer++ = (char)tags->ntags; + if (tags->ntags == 0) { + return ENCODED_HEADER_SIZE; + } + memcpy(buffer, tags->kvm, tags->kvm_used); + return ENCODED_HEADER_SIZE + tags->kvm_used; +} + +size_t census_context_encode(const census_context *context, char *buffer, + size_t buf_size) { + return tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size); +} + +// Decode a tag set. +static void tag_set_decode(struct tag_set *tags, const char *buffer, + size_t size) { + uint8_t version = (uint8_t)(*buffer++); + uint8_t header_size = (uint8_t)(*buffer++); + uint8_t tag_header_size = (uint8_t)(*buffer++); + tags->ntags = tags->ntags_alloc = (int)(*buffer++); + if (tags->ntags == 0) { + tags->ntags_alloc = 0; + tags->kvm_size = 0; + tags->kvm_used = 0; + tags->kvm = NULL; + return; + } + if (header_size != ENCODED_HEADER_SIZE) { + GPR_ASSERT(version != ENCODED_VERSION); + GPR_ASSERT(ENCODED_HEADER_SIZE < header_size); + buffer += (header_size - ENCODED_HEADER_SIZE); + } + tags->kvm_used = size - header_size; + tags->kvm_size = tags->kvm_used + CENSUS_MAX_TAG_KV_LEN; + tags->kvm = gpr_malloc(tags->kvm_size); + if (tag_header_size != TAG_HEADER_SIZE) { + // something new in the tag information. I don't understand it, so + // don't copy it over. + GPR_ASSERT(version != ENCODED_VERSION); + GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE); + char *kvp = tags->kvm; + for (int i = 0; i < tags->ntags; i++) { + memcpy(kvp, buffer, TAG_HEADER_SIZE); + kvp += header_size; + struct raw_tag raw; + buffer = + decode_tag(&raw, (char *)buffer, tag_header_size - TAG_HEADER_SIZE); + memcpy(kvp, raw.key, (size_t)raw.key_len + raw.value_len); + kvp += raw.key_len + raw.value_len; + } + } else { + memcpy(tags->kvm, buffer, tags->kvm_used); + } +} + +census_context *census_context_decode(const char *buffer, size_t size) { + census_context *context = gpr_malloc(sizeof(census_context)); + memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set)); + if (buffer == NULL) { + memset(&context->tags[PROPAGATED_TAGS], 0, sizeof(struct tag_set)); + } else { + tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size); + } + memset(&context->status, 0, sizeof(context->status)); + context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; + return context; +} diff --git a/src/core/ext/census/grpc_context.c b/src/core/ext/census/grpc_context.c new file mode 100644 index 0000000000..98285ab2d5 --- /dev/null +++ b/src/core/ext/census/grpc_context.c @@ -0,0 +1,53 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> +#include <grpc/grpc.h> +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/call.h" + +void grpc_census_call_set_context(grpc_call *call, census_context *context) { + GRPC_API_TRACE("grpc_census_call_set_context(call=%p, census_context=%p)", 2, + (call, context)); + if (census_enabled() == CENSUS_FEATURE_NONE) { + return; + } + if (context != NULL) { + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context, NULL); + } +} + +census_context *grpc_census_call_get_context(grpc_call *call) { + GRPC_API_TRACE("grpc_census_call_get_context(call=%p)", 1, (call)); + return (census_context *)grpc_call_context_get(call, GRPC_CONTEXT_TRACING); +} diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c new file mode 100644 index 0000000000..5e278ef127 --- /dev/null +++ b/src/core/ext/census/grpc_filter.c @@ -0,0 +1,198 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/census/grpc_filter.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/census.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice.h> +#include <grpc/support/time.h> + +#include "src/core/ext/census/census_interface.h" +#include "src/core/ext/census/census_rpc_stats.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/transport/static_metadata.h" + +typedef struct call_data { + census_op_id op_id; + census_context *ctxt; + gpr_timespec start_ts; + int error; + + /* recv callback */ + grpc_metadata_batch *recv_initial_metadata; + grpc_closure *on_done_recv; + grpc_closure finish_recv; +} call_data; + +typedef struct channel_data { uint8_t unused; } channel_data; + +static void extract_and_annotate_method_tag(grpc_metadata_batch *md, + call_data *calld, + channel_data *chand) { + grpc_linked_mdelem *m; + for (m = md->list.head; m != NULL; m = m->next) { + if (m->md->key == GRPC_MDSTR_PATH) { + gpr_log(GPR_DEBUG, "%s", + (const char *)GPR_SLICE_START_PTR(m->md->value->slice)); + /* Add method tag here */ + } + } +} + +static void client_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + if (op->send_initial_metadata) { + extract_and_annotate_method_tag(op->send_initial_metadata, calld, chand); + } +} + +static void client_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + client_mutate_op(elem, op); + grpc_call_next_op(exec_ctx, elem, op); +} + +static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, + bool success) { + grpc_call_element *elem = ptr; + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + if (success) { + extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand); + } + calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); +} + +static void server_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + if (op->recv_initial_metadata) { + /* substitute our callback for the op callback */ + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->finish_recv; + } +} + +static void server_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + /* TODO(ctiller): this code fails. I don't know why. I expect it's + incomplete, and someone should look at it soon. + + call_data *calld = elem->call_data; + GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); */ + server_mutate_op(elem, op); + grpc_call_next_op(exec_ctx, elem, op); +} + +static void client_init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_call_element_args *args) { + call_data *d = elem->call_data; + GPR_ASSERT(d != NULL); + memset(d, 0, sizeof(*d)); + d->start_ts = gpr_now(GPR_CLOCK_REALTIME); +} + +static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, void *ignored) { + call_data *d = elem->call_data; + GPR_ASSERT(d != NULL); + /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ +} + +static void server_init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_call_element_args *args) { + call_data *d = elem->call_data; + GPR_ASSERT(d != NULL); + memset(d, 0, sizeof(*d)); + d->start_ts = gpr_now(GPR_CLOCK_REALTIME); + /* TODO(hongyu): call census_tracing_start_op here. */ + grpc_closure_init(&d->finish_recv, server_on_done_recv, elem); +} + +static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, void *ignored) { + call_data *d = elem->call_data; + GPR_ASSERT(d != NULL); + /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ +} + +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + channel_data *chand = elem->channel_data; + GPR_ASSERT(chand != NULL); +} + +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + GPR_ASSERT(chand != NULL); +} + +const grpc_channel_filter grpc_client_census_filter = { + client_start_transport_op, + grpc_channel_next_op, + sizeof(call_data), + client_init_call_elem, + grpc_call_stack_ignore_set_pollset, + client_destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "census-client"}; + +const grpc_channel_filter grpc_server_census_filter = { + server_start_transport_op, + grpc_channel_next_op, + sizeof(call_data), + server_init_call_elem, + grpc_call_stack_ignore_set_pollset, + server_destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "census-server"}; diff --git a/src/core/ext/census/grpc_filter.h b/src/core/ext/census/grpc_filter.h new file mode 100644 index 0000000000..a39bd82224 --- /dev/null +++ b/src/core/ext/census/grpc_filter.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_GRPC_FILTER_H +#define GRPC_CORE_EXT_CENSUS_GRPC_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +/* Census filters: provides tracing and stats collection functionalities. It + needs to reside right below the surface filter in the channel stack. */ +extern const grpc_channel_filter grpc_client_census_filter; +extern const grpc_channel_filter grpc_server_census_filter; + +#endif /* GRPC_CORE_EXT_CENSUS_GRPC_FILTER_H */ diff --git a/src/core/ext/census/grpc_plugin.c b/src/core/ext/census/grpc_plugin.c new file mode 100644 index 0000000000..0f15ecb2c2 --- /dev/null +++ b/src/core/ext/census/grpc_plugin.c @@ -0,0 +1,68 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <limits.h> + +#include <grpc/census.h> + +#include "src/core/ext/census/grpc_filter.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/surface/channel_init.h" + +static bool maybe_add_census_filter(grpc_channel_stack_builder *builder, + void *arg_must_be_null) { + const grpc_channel_args *args = + grpc_channel_stack_builder_get_channel_arguments(builder); + if (grpc_channel_args_is_census_enabled(args)) { + return grpc_channel_stack_builder_prepend_filter( + builder, &grpc_client_census_filter, NULL, NULL); + } + return true; +} + +void census_grpc_plugin_init(void) { + /* Only initialize census if no one else has and some features are + * available. */ + if (census_enabled() == CENSUS_FEATURE_NONE && + census_supported() != CENSUS_FEATURE_NONE) { + if (census_initialize(census_supported())) { /* enable all features. */ + gpr_log(GPR_ERROR, "Could not initialize census."); + } + } + grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, + maybe_add_census_filter, NULL); + grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, + maybe_add_census_filter, NULL); +} + +void census_grpc_plugin_shutdown(void) { census_shutdown(); } diff --git a/src/core/ext/census/hash_table.c b/src/core/ext/census/hash_table.c new file mode 100644 index 0000000000..ee6fdfc6e8 --- /dev/null +++ b/src/core/ext/census/hash_table.c @@ -0,0 +1,303 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/census/hash_table.h" + +#include <stddef.h> +#include <stdio.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +#define CENSUS_HT_NUM_BUCKETS 1999 + +/* A single hash table data entry */ +typedef struct ht_entry { + census_ht_key key; + void *data; + struct ht_entry *next; +} ht_entry; + +/* hash table bucket */ +typedef struct bucket { + /* NULL if bucket is empty */ + ht_entry *next; + /* -1 if all buckets are empty. */ + int32_t prev_non_empty_bucket; + /* -1 if all buckets are empty. */ + int32_t next_non_empty_bucket; +} bucket; + +struct unresizable_hash_table { + /* Number of entries in the table */ + size_t size; + /* Number of buckets */ + uint32_t num_buckets; + /* Array of buckets initialized at creation time. Memory consumption is + 16 bytes per bucket on a 64-bit platform. */ + bucket *buckets; + /* Index of the first non-empty bucket. -1 iff size == 0. */ + int32_t first_non_empty_bucket; + /* Index of the last non_empty bucket. -1 iff size == 0. */ + int32_t last_non_empty_bucket; + /* Immutable options of this hash table, initialized at creation time. */ + census_ht_option options; +}; + +typedef struct entry_locator { + int32_t bucket_idx; + int is_first_in_chain; + int found; + ht_entry *prev_entry; +} entry_locator; + +/* Asserts if option is not valid. */ +void check_options(const census_ht_option *option) { + GPR_ASSERT(option != NULL); + GPR_ASSERT(option->num_buckets > 0); + GPR_ASSERT(option->key_type == CENSUS_HT_UINT64 || + option->key_type == CENSUS_HT_POINTER); + if (option->key_type == CENSUS_HT_UINT64) { + GPR_ASSERT(option->hash == NULL); + } else if (option->key_type == CENSUS_HT_POINTER) { + GPR_ASSERT(option->hash != NULL); + GPR_ASSERT(option->compare_keys != NULL); + } +} + +#define REMOVE_NEXT(options, ptr) \ + do { \ + ht_entry *tmp = (ptr)->next; \ + (ptr)->next = tmp->next; \ + delete_entry(options, tmp); \ + } while (0) + +static void delete_entry(const census_ht_option *opt, ht_entry *p) { + if (opt->delete_data != NULL) { + opt->delete_data(p->data); + } + if (opt->delete_key != NULL) { + opt->delete_key(p->key.ptr); + } + gpr_free(p); +} + +static uint64_t hash(const census_ht_option *opt, census_ht_key key) { + return opt->key_type == CENSUS_HT_UINT64 ? key.val : opt->hash(key.ptr); +} + +census_ht *census_ht_create(const census_ht_option *option) { + int i; + census_ht *ret = NULL; + check_options(option); + ret = (census_ht *)gpr_malloc(sizeof(census_ht)); + ret->size = 0; + ret->num_buckets = option->num_buckets; + ret->buckets = (bucket *)gpr_malloc(sizeof(bucket) * ret->num_buckets); + ret->options = *option; + /* initialize each bucket */ + for (i = 0; i < ret->options.num_buckets; i++) { + ret->buckets[i].prev_non_empty_bucket = -1; + ret->buckets[i].next_non_empty_bucket = -1; + ret->buckets[i].next = NULL; + } + return ret; +} + +static int32_t find_bucket_idx(const census_ht *ht, census_ht_key key) { + return hash(&ht->options, key) % ht->num_buckets; +} + +static int keys_match(const census_ht_option *opt, const ht_entry *p, + const census_ht_key key) { + GPR_ASSERT(opt->key_type == CENSUS_HT_UINT64 || + opt->key_type == CENSUS_HT_POINTER); + if (opt->key_type == CENSUS_HT_UINT64) return p->key.val == key.val; + return !opt->compare_keys((p->key).ptr, key.ptr); +} + +static entry_locator ht_find(const census_ht *ht, census_ht_key key) { + entry_locator loc = {0, 0, 0, NULL}; + int32_t idx = 0; + ht_entry *ptr = NULL; + GPR_ASSERT(ht != NULL); + idx = find_bucket_idx(ht, key); + ptr = ht->buckets[idx].next; + if (ptr == NULL) { + /* bucket is empty */ + return loc; + } + if (keys_match(&ht->options, ptr, key)) { + loc.bucket_idx = idx; + loc.is_first_in_chain = 1; + loc.found = 1; + return loc; + } else { + for (; ptr->next != NULL; ptr = ptr->next) { + if (keys_match(&ht->options, ptr->next, key)) { + loc.bucket_idx = idx; + loc.is_first_in_chain = 0; + loc.found = 1; + loc.prev_entry = ptr; + return loc; + } + } + } + /* Could not find the key */ + return loc; +} + +void *census_ht_find(const census_ht *ht, census_ht_key key) { + entry_locator loc = ht_find(ht, key); + if (loc.found == 0) { + return NULL; + } + return loc.is_first_in_chain ? ht->buckets[loc.bucket_idx].next->data + : loc.prev_entry->next->data; +} + +void census_ht_insert(census_ht *ht, census_ht_key key, void *data) { + int32_t idx = find_bucket_idx(ht, key); + ht_entry *ptr = NULL; + entry_locator loc = ht_find(ht, key); + if (loc.found) { + /* Replace old value with new value. */ + ptr = loc.is_first_in_chain ? ht->buckets[loc.bucket_idx].next + : loc.prev_entry->next; + if (ht->options.delete_data != NULL) { + ht->options.delete_data(ptr->data); + } + ptr->data = data; + return; + } + + /* first entry in the table. */ + if (ht->size == 0) { + ht->buckets[idx].next_non_empty_bucket = -1; + ht->buckets[idx].prev_non_empty_bucket = -1; + ht->first_non_empty_bucket = idx; + ht->last_non_empty_bucket = idx; + } else if (ht->buckets[idx].next == NULL) { + /* first entry in the bucket. */ + ht->buckets[ht->last_non_empty_bucket].next_non_empty_bucket = idx; + ht->buckets[idx].prev_non_empty_bucket = ht->last_non_empty_bucket; + ht->buckets[idx].next_non_empty_bucket = -1; + ht->last_non_empty_bucket = idx; + } + ptr = (ht_entry *)gpr_malloc(sizeof(ht_entry)); + ptr->key = key; + ptr->data = data; + ptr->next = ht->buckets[idx].next; + ht->buckets[idx].next = ptr; + ht->size++; +} + +void census_ht_erase(census_ht *ht, census_ht_key key) { + entry_locator loc = ht_find(ht, key); + if (loc.found == 0) { + /* noop if not found */ + return; + } + ht->size--; + if (loc.is_first_in_chain) { + bucket *b = &ht->buckets[loc.bucket_idx]; + GPR_ASSERT(b->next != NULL); + /* The only entry in the bucket */ + if (b->next->next == NULL) { + int prev = b->prev_non_empty_bucket; + int next = b->next_non_empty_bucket; + if (prev != -1) { + ht->buckets[prev].next_non_empty_bucket = next; + } else { + ht->first_non_empty_bucket = next; + } + if (next != -1) { + ht->buckets[next].prev_non_empty_bucket = prev; + } else { + ht->last_non_empty_bucket = prev; + } + } + REMOVE_NEXT(&ht->options, b); + } else { + GPR_ASSERT(loc.prev_entry->next != NULL); + REMOVE_NEXT(&ht->options, loc.prev_entry); + } +} + +/* Returns NULL if input table is empty. */ +census_ht_kv *census_ht_get_all_elements(const census_ht *ht, size_t *num) { + census_ht_kv *ret = NULL; + int i = 0; + int32_t idx = -1; + GPR_ASSERT(ht != NULL && num != NULL); + *num = ht->size; + if (*num == 0) { + return NULL; + } + + ret = (census_ht_kv *)gpr_malloc(sizeof(census_ht_kv) * ht->size); + idx = ht->first_non_empty_bucket; + while (idx >= 0) { + ht_entry *ptr = ht->buckets[idx].next; + for (; ptr != NULL; ptr = ptr->next) { + ret[i].k = ptr->key; + ret[i].v = ptr->data; + i++; + } + idx = ht->buckets[idx].next_non_empty_bucket; + } + return ret; +} + +static void ht_delete_entry_chain(const census_ht_option *options, + ht_entry *first) { + if (first == NULL) { + return; + } + if (first->next != NULL) { + ht_delete_entry_chain(options, first->next); + } + delete_entry(options, first); +} + +void census_ht_destroy(census_ht *ht) { + unsigned i; + for (i = 0; i < ht->num_buckets; ++i) { + ht_delete_entry_chain(&ht->options, ht->buckets[i].next); + } + gpr_free(ht->buckets); + gpr_free(ht); +} + +size_t census_ht_get_size(const census_ht *ht) { return ht->size; } diff --git a/src/core/ext/census/hash_table.h b/src/core/ext/census/hash_table.h new file mode 100644 index 0000000000..30ea4264a2 --- /dev/null +++ b/src/core/ext/census/hash_table.h @@ -0,0 +1,131 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_HASH_TABLE_H +#define GRPC_CORE_EXT_CENSUS_HASH_TABLE_H + +#include <stddef.h> + +#include <grpc/support/port_platform.h> + +/* A chain based hash table with fixed number of buckets. + Your probably shouldn't use this code directly. It is implemented for the + use case in census trace store and stats store, where number of entries in + the table is in the scale of upto several thousands, entries are added and + removed from the table very frequently (~100k/s), the frequency of find() + operations is roughly several times of the frequency of insert() and erase() + Comparing to find(), the insert(), erase() and get_all_entries() operations + are much less freqent (<1/s). + + Per bucket memory overhead is about (8 + sizeof(intptr_t) bytes. + Per entry memory overhead is about (8 + 2 * sizeof(intptr_t) bytes. + + All functions are not thread-safe. Synchronization will be provided in the + upper layer (in trace store and stats store). +*/ + +/* Opaque hash table struct */ +typedef struct unresizable_hash_table census_ht; + +/* Currently, the hash_table can take two types of keys. (uint64 for trace + store and const char* for stats store). */ +typedef union { + uint64_t val; + void *ptr; +} census_ht_key; + +typedef enum census_ht_key_type { + CENSUS_HT_UINT64 = 0, + CENSUS_HT_POINTER = 1 +} census_ht_key_type; + +typedef struct census_ht_option { + /* Type of hash key */ + census_ht_key_type key_type; + /* Desired number of buckets, preferably a prime number */ + int32_t num_buckets; + /* Fucntion to calculate uint64 hash value of the key. Only takes effect if + key_type is POINTER. */ + uint64_t (*hash)(const void *); + /* Function to compare two keys, returns 0 iff equal. Only takes effect if + key_type is POINTER */ + int (*compare_keys)(const void *k1, const void *k2); + /* Value deleter. NULL if no specialized delete function is needed. */ + void (*delete_data)(void *); + /* Key deleter. NULL if table does not own the key. (e.g. key is part of the + value or key is not owned by the table.) */ + void (*delete_key)(void *); +} census_ht_option; + +/* Creates a hashtable with fixed number of buckets according to the settings + specified in 'options' arg. Function pointers "hash" and "compare_keys" must + be provided if key_type is POINTER. Asserts if fail to create. */ +census_ht *census_ht_create(const census_ht_option *options); + +/* Deletes hash table instance. Frees all dynamic memory owned by ht.*/ +void census_ht_destroy(census_ht *ht); + +/* Inserts the input key-val pair into hash_table. If an entry with the same key + exists in the table, the corresponding value will be overwritten by the input + val. */ +void census_ht_insert(census_ht *ht, census_ht_key key, void *val); + +/* Returns pointer to data, returns NULL if not found. */ +void *census_ht_find(const census_ht *ht, census_ht_key key); + +/* Erase hash table entry with input key. Noop if key is not found. */ +void census_ht_erase(census_ht *ht, census_ht_key key); + +typedef struct census_ht_kv { + census_ht_key k; + void *v; +} census_ht_kv; + +/* Returns an array of pointers to all values in the hash table. Order of the + elements can be arbitrary. Sets 'num' to the size of returned array. Caller + owns returned array. */ +census_ht_kv *census_ht_get_all_elements(const census_ht *ht, size_t *num); + +/* Returns number of elements kept. */ +size_t census_ht_get_size(const census_ht *ht); + +/* Functor applied on each key-value pair while iterating through entries in the + table. The functor should not mutate data. */ +typedef void (*census_ht_itr_cb)(census_ht_key key, const void *val_ptr, + void *state); + +/* Iterates through all key-value pairs in the hash_table. The callback function + should not invalidate data entries. */ +uint64_t census_ht_for_all(const census_ht *ht, census_ht_itr_cb); + +#endif /* GRPC_CORE_EXT_CENSUS_HASH_TABLE_H */ diff --git a/src/core/ext/census/initialize.c b/src/core/ext/census/initialize.c new file mode 100644 index 0000000000..896276e44a --- /dev/null +++ b/src/core/ext/census/initialize.c @@ -0,0 +1,54 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> + +static int features_enabled = CENSUS_FEATURE_NONE; + +int census_initialize(int features) { + if (features_enabled != CENSUS_FEATURE_NONE) { + // Must have been a previous call to census_initialize; return error + return 1; + } + features_enabled = features; + return 0; +} + +void census_shutdown(void) { features_enabled = CENSUS_FEATURE_NONE; } + +int census_supported(void) { + /* TODO(aveitch): improve this as we implement features... */ + return CENSUS_FEATURE_NONE; +} + +int census_enabled(void) { return features_enabled; } diff --git a/src/core/ext/census/mlog.c b/src/core/ext/census/mlog.c new file mode 100644 index 0000000000..698b7096ab --- /dev/null +++ b/src/core/ext/census/mlog.c @@ -0,0 +1,600 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// Implements an efficient in-memory log, optimized for multiple writers and +// a single reader. Available log space is divided up in blocks of +// CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the following +// three data structures: +// - Free blocks (free_block_list) +// - Blocks with unread data (dirty_block_list) +// - Blocks currently attached to cores (core_local_blocks[]) +// +// census_log_start_write() moves a block from core_local_blocks[] to the end of +// dirty_block_list when block: +// - is out-of-space OR +// - has an incomplete record (an incomplete record occurs when a thread calls +// census_log_start_write() and is context-switched before calling +// census_log_end_write() +// So, blocks in dirty_block_list are ordered, from oldest to newest, by the +// time when block is detached from the core. +// +// census_log_read_next() first iterates over dirty_block_list and then +// core_local_blocks[]. It moves completely read blocks from dirty_block_list +// to free_block_list. Blocks in core_local_blocks[] are not freed, even when +// completely read. +// +// If the log is configured to discard old records and free_block_list is empty, +// census_log_start_write() iterates over dirty_block_list to allocate a +// new block. It moves the oldest available block (no pending read/write) to +// core_local_blocks[]. +// +// core_local_block_struct is used to implement a map from core id to the block +// associated with that core. This mapping is advisory. It is possible that the +// block returned by this mapping is no longer associated with that core. This +// mapping is updated, lazily, by census_log_start_write(). +// +// Locking in block struct: +// +// Exclusive g_log.lock must be held before calling any functions operating on +// block structs except census_log_start_write() and census_log_end_write(). +// +// Writes to a block are serialized via writer_lock. census_log_start_write() +// acquires this lock and census_log_end_write() releases it. On failure to +// acquire the lock, writer allocates a new block for the current core and +// updates core_local_block accordingly. +// +// Simultaneous read and write access is allowed. Readers can safely read up to +// committed bytes (bytes_committed). +// +// reader_lock protects the block, currently being read, from getting recycled. +// start_read() acquires reader_lock and end_read() releases the lock. +// +// Read/write access to a block is disabled via try_disable_access(). It returns +// with both writer_lock and reader_lock held. These locks are subsequently +// released by enable_access() to enable access to the block. +// +// A note on naming: Most function/struct names are prepended by cl_ +// (shorthand for census_log). Further, functions that manipulate structures +// include the name of the structure, which will be passed as the first +// argument. E.g. cl_block_initialize() will initialize a cl_block. + +#include "src/core/ext/census/mlog.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/cpu.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> +#include <stdbool.h> +#include <string.h> + +// End of platform specific code + +typedef struct census_log_block_list_struct { + struct census_log_block_list_struct* next; + struct census_log_block_list_struct* prev; + struct census_log_block* block; +} cl_block_list_struct; + +typedef struct census_log_block { + // Pointer to underlying buffer. + char* buffer; + gpr_atm writer_lock; + gpr_atm reader_lock; + // Keeps completely written bytes. Declared atomic because accessed + // simultaneously by reader and writer. + gpr_atm bytes_committed; + // Bytes already read. + size_t bytes_read; + // Links for list. + cl_block_list_struct link; +// We want this structure to be cacheline aligned. We assume the following +// sizes for the various parts on 32/64bit systems: +// type 32b size 64b size +// char* 4 8 +// 3x gpr_atm 12 24 +// size_t 4 8 +// cl_block_list_struct 12 24 +// TOTAL 32 64 +// +// Depending on the size of our cacheline and the architecture, we +// selectively add char buffering to this structure. The size is checked +// via assert in census_log_initialize(). +#if defined(GPR_ARCH_64) +#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64) +#else +#if defined(GPR_ARCH_32) +#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 32) +#else +#error "Unknown architecture" +#endif +#endif +#if CL_BLOCK_PAD_SIZE > 0 + char padding[CL_BLOCK_PAD_SIZE]; +#endif +} cl_block; + +// A list of cl_blocks, doubly-linked through cl_block::link. +typedef struct census_log_block_list { + int32_t count; // Number of items in list. + cl_block_list_struct ht; // head/tail of linked list. +} cl_block_list; + +// Cacheline aligned block pointers to avoid false sharing. Block pointer must +// be initialized via set_block(), before calling other functions +typedef struct census_log_core_local_block { + gpr_atm block; +// Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8 +#if defined(GPR_ARCH_64) +#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8) +#else +#if defined(GPR_ARCH_32) +#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 4) +#else +#error "Unknown architecture" +#endif +#endif +#if CL_CORE_LOCAL_BLOCK_PAD_SIZE > 0 + char padding[CL_CORE_LOCAL_BLOCK_PAD_SIZE]; +#endif +} cl_core_local_block; + +struct census_log { + int discard_old_records; + // Number of cores (aka hardware-contexts) + unsigned num_cores; + // number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log + uint32_t num_blocks; + cl_block* blocks; // Block metadata. + cl_core_local_block* core_local_blocks; // Keeps core to block mappings. + gpr_mu lock; + int initialized; // has log been initialized? + // Keeps the state of the reader iterator. A value of 0 indicates that + // iterator has reached the end. census_log_init_reader() resets the value + // to num_core to restart iteration. + uint32_t read_iterator_state; + // Points to the block being read. If non-NULL, the block is locked for + // reading(block_being_read_->reader_lock is held). + cl_block* block_being_read; + char* buffer; + cl_block_list free_block_list; + cl_block_list dirty_block_list; + gpr_atm out_of_space_count; +}; + +// Single internal log. +static struct census_log g_log; + +// Functions that operate on an atomic memory location used as a lock. + +// Returns non-zero if lock is acquired. +static int cl_try_lock(gpr_atm* lock) { return gpr_atm_acq_cas(lock, 0, 1); } + +static void cl_unlock(gpr_atm* lock) { gpr_atm_rel_store(lock, 0); } + +// Functions that operate on cl_core_local_block's. + +static void cl_core_local_block_set_block(cl_core_local_block* clb, + cl_block* block) { + gpr_atm_rel_store(&clb->block, (gpr_atm)block); +} + +static cl_block* cl_core_local_block_get_block(cl_core_local_block* clb) { + return (cl_block*)gpr_atm_acq_load(&clb->block); +} + +// Functions that operate on cl_block_list_struct's. + +static void cl_block_list_struct_initialize(cl_block_list_struct* bls, + cl_block* block) { + bls->next = bls->prev = bls; + bls->block = block; +} + +// Functions that operate on cl_block_list's. + +static void cl_block_list_initialize(cl_block_list* list) { + list->count = 0; + cl_block_list_struct_initialize(&list->ht, NULL); +} + +// Returns head of *this, or NULL if empty. +static cl_block* cl_block_list_head(cl_block_list* list) { + return list->ht.next->block; +} + +// Insert element *e after *pos. +static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos, + cl_block_list_struct* e) { + list->count++; + e->next = pos->next; + e->prev = pos; + e->next->prev = e; + e->prev->next = e; +} + +// Insert block at the head of the list +static void cl_block_list_insert_at_head(cl_block_list* list, cl_block* block) { + cl_block_list_insert(list, &list->ht, &block->link); +} + +// Insert block at the tail of the list. +static void cl_block_list_insert_at_tail(cl_block_list* list, cl_block* block) { + cl_block_list_insert(list, list->ht.prev, &block->link); +} + +// Removes block *b. Requires *b be in the list. +static void cl_block_list_remove(cl_block_list* list, cl_block* b) { + list->count--; + b->link.next->prev = b->link.prev; + b->link.prev->next = b->link.next; +} + +// Functions that operate on cl_block's + +static void cl_block_initialize(cl_block* block, char* buffer) { + block->buffer = buffer; + gpr_atm_rel_store(&block->writer_lock, 0); + gpr_atm_rel_store(&block->reader_lock, 0); + gpr_atm_rel_store(&block->bytes_committed, 0); + block->bytes_read = 0; + cl_block_list_struct_initialize(&block->link, block); +} + +// Guards against exposing partially written buffer to the reader. +static void cl_block_set_bytes_committed(cl_block* block, + size_t bytes_committed) { + gpr_atm_rel_store(&block->bytes_committed, (gpr_atm)bytes_committed); +} + +static size_t cl_block_get_bytes_committed(cl_block* block) { + return (size_t)gpr_atm_acq_load(&block->bytes_committed); +} + +// Tries to disable future read/write access to this block. Succeeds if: +// - no in-progress write AND +// - no in-progress read AND +// - 'discard_data' set to true OR no unread data +// On success, clears the block state and returns with writer_lock_ and +// reader_lock_ held. These locks are released by a subsequent +// cl_block_access_enable() call. +static bool cl_block_try_disable_access(cl_block* block, int discard_data) { + if (!cl_try_lock(&block->writer_lock)) { + return false; + } + if (!cl_try_lock(&block->reader_lock)) { + cl_unlock(&block->writer_lock); + return false; + } + if (!discard_data && + (block->bytes_read != cl_block_get_bytes_committed(block))) { + cl_unlock(&block->reader_lock); + cl_unlock(&block->writer_lock); + return false; + } + cl_block_set_bytes_committed(block, 0); + block->bytes_read = 0; + return true; +} + +static void cl_block_enable_access(cl_block* block) { + cl_unlock(&block->reader_lock); + cl_unlock(&block->writer_lock); +} + +// Returns with writer_lock held. +static void* cl_block_start_write(cl_block* block, size_t size) { + if (!cl_try_lock(&block->writer_lock)) { + return NULL; + } + size_t bytes_committed = cl_block_get_bytes_committed(block); + if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) { + cl_unlock(&block->writer_lock); + return NULL; + } + return block->buffer + bytes_committed; +} + +// Releases writer_lock and increments committed bytes by 'bytes_written'. +// 'bytes_written' must be <= 'size' specified in the corresponding +// StartWrite() call. This function is thread-safe. +static void cl_block_end_write(cl_block* block, size_t bytes_written) { + cl_block_set_bytes_committed( + block, cl_block_get_bytes_committed(block) + bytes_written); + cl_unlock(&block->writer_lock); +} + +// Returns a pointer to the first unread byte in buffer. The number of bytes +// available are returned in 'bytes_available'. Acquires reader lock that is +// released by a subsequent cl_block_end_read() call. Returns NULL if: +// - read in progress +// - no data available +static void* cl_block_start_read(cl_block* block, size_t* bytes_available) { + if (!cl_try_lock(&block->reader_lock)) { + return NULL; + } + // bytes_committed may change from under us. Use bytes_available to update + // bytes_read below. + size_t bytes_committed = cl_block_get_bytes_committed(block); + GPR_ASSERT(bytes_committed >= block->bytes_read); + *bytes_available = bytes_committed - block->bytes_read; + if (*bytes_available == 0) { + cl_unlock(&block->reader_lock); + return NULL; + } + void* record = block->buffer + block->bytes_read; + block->bytes_read += *bytes_available; + return record; +} + +static void cl_block_end_read(cl_block* block) { + cl_unlock(&block->reader_lock); +} + +// Internal functions operating on g_log + +// Allocates a new free block (or recycles an available dirty block if log is +// configured to discard old records). Returns NULL if out-of-space. +static cl_block* cl_allocate_block(void) { + cl_block* block = cl_block_list_head(&g_log.free_block_list); + if (block != NULL) { + cl_block_list_remove(&g_log.free_block_list, block); + return block; + } + if (!g_log.discard_old_records) { + // No free block and log is configured to keep old records. + return NULL; + } + // Recycle dirty block. Start from the oldest. + for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL; + block = block->link.next->block) { + if (cl_block_try_disable_access(block, 1 /* discard data */)) { + cl_block_list_remove(&g_log.dirty_block_list, block); + return block; + } + } + return NULL; +} + +// Allocates a new block and updates core id => block mapping. 'old_block' +// points to the block that the caller thinks is attached to +// 'core_id'. 'old_block' may be NULL. Returns true if: +// - allocated a new block OR +// - 'core_id' => 'old_block' mapping changed (another thread allocated a +// block before lock was acquired). +static bool cl_allocate_core_local_block(uint32_t core_id, + cl_block* old_block) { + // Now that we have the lock, check if core-local mapping has changed. + cl_core_local_block* core_local_block = &g_log.core_local_blocks[core_id]; + cl_block* block = cl_core_local_block_get_block(core_local_block); + if ((block != NULL) && (block != old_block)) { + return true; + } + if (block != NULL) { + cl_core_local_block_set_block(core_local_block, NULL); + cl_block_list_insert_at_tail(&g_log.dirty_block_list, block); + } + block = cl_allocate_block(); + if (block == NULL) { + return false; + } + cl_core_local_block_set_block(core_local_block, block); + cl_block_enable_access(block); + return true; +} + +static cl_block* cl_get_block(void* record) { + uintptr_t p = (uintptr_t)((char*)record - g_log.buffer); + uintptr_t index = p >> CENSUS_LOG_2_MAX_RECORD_SIZE; + return &g_log.blocks[index]; +} + +// Gets the next block to read and tries to free 'prev' block (if not NULL). +// Returns NULL if reached the end. +static cl_block* cl_next_block_to_read(cl_block* prev) { + cl_block* block = NULL; + if (g_log.read_iterator_state == g_log.num_cores) { + // We are traversing dirty list; find the next dirty block. + if (prev != NULL) { + // Try to free the previous block if there is no unread data. This + // block + // may have unread data if previously incomplete record completed + // between + // read_next() calls. + block = prev->link.next->block; + if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) { + cl_block_list_remove(&g_log.dirty_block_list, prev); + cl_block_list_insert_at_head(&g_log.free_block_list, prev); + } + } else { + block = cl_block_list_head(&g_log.dirty_block_list); + } + if (block != NULL) { + return block; + } + // We are done with the dirty list; moving on to core-local blocks. + } + while (g_log.read_iterator_state > 0) { + g_log.read_iterator_state--; + block = cl_core_local_block_get_block( + &g_log.core_local_blocks[g_log.read_iterator_state]); + if (block != NULL) { + return block; + } + } + return NULL; +} + +#define CL_LOG_2_MB 20 // 2^20 = 1MB + +// External functions: primary stats_log interface +void census_log_initialize(size_t size_in_mb, int discard_old_records) { + // Check cacheline alignment. + GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0); + GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0); + GPR_ASSERT(!g_log.initialized); + g_log.discard_old_records = discard_old_records; + g_log.num_cores = gpr_cpu_num_cores(); + // Ensure that we will not get any overflow in calaculating num_blocks + GPR_ASSERT(CL_LOG_2_MB >= CENSUS_LOG_2_MAX_RECORD_SIZE); + GPR_ASSERT(size_in_mb < 1000); + // Ensure at least 2x as many blocks as there are cores. + g_log.num_blocks = + (uint32_t)GPR_MAX(2 * g_log.num_cores, (size_in_mb << CL_LOG_2_MB) >> + CENSUS_LOG_2_MAX_RECORD_SIZE); + gpr_mu_init(&g_log.lock); + g_log.read_iterator_state = 0; + g_log.block_being_read = NULL; + g_log.core_local_blocks = (cl_core_local_block*)gpr_malloc_aligned( + g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE_LOG); + memset(g_log.core_local_blocks, 0, + g_log.num_cores * sizeof(cl_core_local_block)); + g_log.blocks = (cl_block*)gpr_malloc_aligned( + g_log.num_blocks * sizeof(cl_block), GPR_CACHELINE_SIZE_LOG); + memset(g_log.blocks, 0, g_log.num_blocks * sizeof(cl_block)); + g_log.buffer = gpr_malloc(g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE); + memset(g_log.buffer, 0, g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE); + cl_block_list_initialize(&g_log.free_block_list); + cl_block_list_initialize(&g_log.dirty_block_list); + for (uint32_t i = 0; i < g_log.num_blocks; ++i) { + cl_block* block = g_log.blocks + i; + cl_block_initialize(block, g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * i)); + cl_block_try_disable_access(block, 1 /* discard data */); + cl_block_list_insert_at_tail(&g_log.free_block_list, block); + } + gpr_atm_rel_store(&g_log.out_of_space_count, 0); + g_log.initialized = 1; +} + +void census_log_shutdown(void) { + GPR_ASSERT(g_log.initialized); + gpr_mu_destroy(&g_log.lock); + gpr_free_aligned(g_log.core_local_blocks); + g_log.core_local_blocks = NULL; + gpr_free_aligned(g_log.blocks); + g_log.blocks = NULL; + gpr_free(g_log.buffer); + g_log.buffer = NULL; + g_log.initialized = 0; +} + +void* census_log_start_write(size_t size) { + // Used to bound number of times block allocation is attempted. + GPR_ASSERT(size > 0); + GPR_ASSERT(g_log.initialized); + if (size > CENSUS_LOG_MAX_RECORD_SIZE) { + return NULL; + } + uint32_t attempts_remaining = g_log.num_blocks; + uint32_t core_id = gpr_cpu_current_cpu(); + do { + void* record = NULL; + cl_block* block = + cl_core_local_block_get_block(&g_log.core_local_blocks[core_id]); + if (block && (record = cl_block_start_write(block, size))) { + return record; + } + // Need to allocate a new block. We are here if: + // - No block associated with the core OR + // - Write in-progress on the block OR + // - block is out of space + gpr_mu_lock(&g_log.lock); + bool allocated = cl_allocate_core_local_block(core_id, block); + gpr_mu_unlock(&g_log.lock); + if (!allocated) { + gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); + return NULL; + } + } while (attempts_remaining--); + // Give up. + gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); + return NULL; +} + +void census_log_end_write(void* record, size_t bytes_written) { + GPR_ASSERT(g_log.initialized); + cl_block_end_write(cl_get_block(record), bytes_written); +} + +void census_log_init_reader(void) { + GPR_ASSERT(g_log.initialized); + gpr_mu_lock(&g_log.lock); + // If a block is locked for reading unlock it. + if (g_log.block_being_read != NULL) { + cl_block_end_read(g_log.block_being_read); + g_log.block_being_read = NULL; + } + g_log.read_iterator_state = g_log.num_cores; + gpr_mu_unlock(&g_log.lock); +} + +const void* census_log_read_next(size_t* bytes_available) { + GPR_ASSERT(g_log.initialized); + gpr_mu_lock(&g_log.lock); + if (g_log.block_being_read != NULL) { + cl_block_end_read(g_log.block_being_read); + } + do { + g_log.block_being_read = cl_next_block_to_read(g_log.block_being_read); + if (g_log.block_being_read != NULL) { + void* record = + cl_block_start_read(g_log.block_being_read, bytes_available); + if (record != NULL) { + gpr_mu_unlock(&g_log.lock); + return record; + } + } + } while (g_log.block_being_read != NULL); + gpr_mu_unlock(&g_log.lock); + return NULL; +} + +size_t census_log_remaining_space(void) { + GPR_ASSERT(g_log.initialized); + size_t space = 0; + gpr_mu_lock(&g_log.lock); + if (g_log.discard_old_records) { + // Remaining space is not meaningful; just return the entire log space. + space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE; + } else { + GPR_ASSERT(g_log.free_block_list.count >= 0); + space = (size_t)g_log.free_block_list.count * CENSUS_LOG_MAX_RECORD_SIZE; + } + gpr_mu_unlock(&g_log.lock); + return space; +} + +int64_t census_log_out_of_space_count(void) { + GPR_ASSERT(g_log.initialized); + return gpr_atm_acq_load(&g_log.out_of_space_count); +} diff --git a/src/core/ext/census/mlog.h b/src/core/ext/census/mlog.h new file mode 100644 index 0000000000..a256426f91 --- /dev/null +++ b/src/core/ext/census/mlog.h @@ -0,0 +1,95 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* A very fast in-memory log, optimized for multiple writers. */ + +#ifndef GRPC_CORE_EXT_CENSUS_MLOG_H +#define GRPC_CORE_EXT_CENSUS_MLOG_H + +#include <grpc/support/port_platform.h> +#include <stddef.h> + +/* Maximum record size, in bytes. */ +#define CENSUS_LOG_2_MAX_RECORD_SIZE 14 /* 2^14 = 16KB */ +#define CENSUS_LOG_MAX_RECORD_SIZE (1 << CENSUS_LOG_2_MAX_RECORD_SIZE) + +/* Initialize the statistics logging subsystem with the given log size. A log + size of 0 will result in the smallest possible log for the platform + (approximately CENSUS_LOG_MAX_RECORD_SIZE * gpr_cpu_num_cores()). If + discard_old_records is non-zero, then new records will displace older ones + when the log is full. This function must be called before any other + census_log functions. +*/ +void census_log_initialize(size_t size_in_mb, int discard_old_records); + +/* Shutdown the logging subsystem. Caller must ensure that: + - no in progress or future call to any census_log functions + - no incomplete records +*/ +void census_log_shutdown(void); + +/* Allocates and returns a 'size' bytes record and marks it in use. A + subsequent census_log_end_write() marks the record complete. The + 'bytes_written' census_log_end_write() argument must be <= + 'size'. Returns NULL if out-of-space AND: + - log is configured to keep old records OR + - all blocks are pinned by incomplete records. +*/ +void* census_log_start_write(size_t size); + +void census_log_end_write(void* record, size_t bytes_written); + +void census_log_init_reader(void); + +/* census_log_read_next() iterates over blocks with data and for each block + returns a pointer to the first unread byte. The number of bytes that can be + read are returned in 'bytes_available'. Reader is expected to read all + available data. Reading the data consumes it i.e. it cannot be read again. + census_log_read_next() returns NULL if the end is reached i.e last block + is read. census_log_init_reader() starts the iteration or aborts the + current iteration. +*/ +const void* census_log_read_next(size_t* bytes_available); + +/* Returns estimated remaining space across all blocks, in bytes. If log is + configured to discard old records, returns total log space. Otherwise, + returns space available in empty blocks (partially filled blocks are + treated as full). +*/ +size_t census_log_remaining_space(void); + +/* Returns the number of times gprc_stats_log_start_write() failed due to + out-of-space. */ +int64_t census_log_out_of_space_count(void); + +#endif /* GRPC_CORE_EXT_CENSUS_MLOG_H */ diff --git a/src/core/ext/census/operation.c b/src/core/ext/census/operation.c new file mode 100644 index 0000000000..5c58704372 --- /dev/null +++ b/src/core/ext/census/operation.c @@ -0,0 +1,63 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> + +/* TODO(aveitch): These are all placeholder implementations. */ + +census_timestamp census_start_rpc_op_timestamp(void) { + census_timestamp ct; + /* TODO(aveitch): assumes gpr_timespec implementation of census_timestamp. */ + ct.ts = gpr_now(GPR_CLOCK_MONOTONIC); + return ct; +} + +census_context *census_start_client_rpc_op( + const census_context *context, int64_t rpc_name_id, + const census_rpc_name_info *rpc_name_info, const char *peer, int trace_mask, + const census_timestamp *start_time) { + return NULL; +} + +census_context *census_start_server_rpc_op( + const char *buffer, int64_t rpc_name_id, + const census_rpc_name_info *rpc_name_info, const char *peer, int trace_mask, + census_timestamp *start_time) { + return NULL; +} + +census_context *census_start_op(census_context *context, const char *family, + const char *name, int trace_mask) { + return NULL; +} + +void census_end_op(census_context *context, int status) {} diff --git a/src/core/ext/census/placeholders.c b/src/core/ext/census/placeholders.c new file mode 100644 index 0000000000..fe23d13971 --- /dev/null +++ b/src/core/ext/census/placeholders.c @@ -0,0 +1,109 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> + +#include <grpc/support/log.h> + +/* Placeholders for the pending APIs */ + +int census_get_trace_record(census_trace_record *trace_record) { + (void)trace_record; + abort(); +} + +void census_record_values(census_context *context, census_value *values, + size_t nvalues) { + (void)context; + (void)values; + (void)nvalues; + abort(); +} + +void census_set_rpc_client_peer(census_context *context, const char *peer) { + (void)context; + (void)peer; + abort(); +} + +void census_trace_scan_end() { abort(); } + +int census_trace_scan_start(int consume) { + (void)consume; + abort(); +} + +const census_aggregation *census_view_aggregrations(const census_view *view) { + (void)view; + abort(); +} + +census_view *census_view_create(uint32_t metric_id, const census_context *tags, + const census_aggregation *aggregations, + size_t naggregations) { + (void)metric_id; + (void)tags; + (void)aggregations; + (void)naggregations; + abort(); +} + +const census_context *census_view_tags(const census_view *view) { + (void)view; + abort(); +} + +void census_view_delete(census_view *view) { + (void)view; + abort(); +} + +const census_view_data *census_view_get_data(const census_view *view) { + (void)view; + abort(); +} + +size_t census_view_metric(const census_view *view) { + (void)view; + abort(); +} + +size_t census_view_naggregations(const census_view *view) { + (void)view; + abort(); +} + +void census_view_reset(census_view *view) { + (void)view; + abort(); +} diff --git a/src/core/ext/census/rpc_metric_id.h b/src/core/ext/census/rpc_metric_id.h new file mode 100644 index 0000000000..888ec500a7 --- /dev/null +++ b/src/core/ext/census/rpc_metric_id.h @@ -0,0 +1,51 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_RPC_METRIC_ID_H +#define GRPC_CORE_EXT_CENSUS_RPC_METRIC_ID_H + +/* Metric ID's used for RPC measurements. */ +/* Count of client requests sent. */ +#define CENSUS_METRIC_RPC_CLIENT_REQUESTS ((uint32_t)0) +/* Count of server requests sent. */ +#define CENSUS_METRIC_RPC_SERVER_REQUESTS ((uint32_t)1) +/* Client error counts. */ +#define CENSUS_METRIC_RPC_CLIENT_ERRORS ((uint32_t)2) +/* Server error counts. */ +#define CENSUS_METRIC_RPC_SERVER_ERRORS ((uint32_t)3) +/* Client side request latency. */ +#define CENSUS_METRIC_RPC_CLIENT_LATENCY ((uint32_t)4) +/* Server side request latency. */ +#define CENSUS_METRIC_RPC_SERVER_LATENCY ((uint32_t)5) + +#endif /* GRPC_CORE_EXT_CENSUS_RPC_METRIC_ID_H */ diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c new file mode 100644 index 0000000000..3b5d6dab2b --- /dev/null +++ b/src/core/ext/census/tracing.c @@ -0,0 +1,45 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> + +/* TODO(aveitch): These are all placeholder implementations. */ + +int census_trace_mask(const census_context *context) { + return CENSUS_TRACE_MASK_NONE; +} + +void census_set_trace_mask(int trace_mask) {} + +void census_trace_print(census_context *context, uint32_t type, + const char *buffer, size_t n) {} diff --git a/src/core/ext/census/window_stats.c b/src/core/ext/census/window_stats.c new file mode 100644 index 0000000000..5f7bd9952e --- /dev/null +++ b/src/core/ext/census/window_stats.c @@ -0,0 +1,316 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/census/window_stats.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include <math.h> +#include <stddef.h> +#include <string.h> + +/* typedefs make typing long names easier. Use cws (for census_window_stats) */ +typedef census_window_stats_stat_info cws_stat_info; +typedef struct census_window_stats_sum cws_sum; + +/* Each interval is composed of a number of buckets, which hold a count of + entries and a single statistic */ +typedef struct census_window_stats_bucket { + int64_t count; + void *statistic; +} cws_bucket; + +/* Each interval has a set of buckets, and the variables needed to keep + track of their current state */ +typedef struct census_window_stats_interval_stats { + /* The buckets. There will be 'granularity' + 1 of these. */ + cws_bucket *buckets; + /* Index of the bucket containing the smallest time interval. */ + int bottom_bucket; + /* The smallest time storable in the current window. */ + int64_t bottom; + /* The largest time storable in the current window + 1ns */ + int64_t top; + /* The width of each bucket in ns. */ + int64_t width; +} cws_interval_stats; + +typedef struct census_window_stats { + /* Number of intervals. */ + int nintervals; + /* Number of buckets in each interval. 'granularity' + 1. */ + int nbuckets; + /* Record of stat_info. */ + cws_stat_info stat_info; + /* Stats for each interval. */ + cws_interval_stats *interval_stats; + /* The time the newset stat was recorded. */ + int64_t newest_time; +} window_stats; + +/* Calculate an actual bucket index from a logical index 'IDX'. Other + parameters supply information on the interval struct and overall stats. */ +#define BUCKET_IDX(IS, IDX, WSTATS) \ + ((IS->bottom_bucket + (IDX)) % WSTATS->nbuckets) + +/* The maximum seconds value we can have in a valid timespec. More than this + will result in overflow in timespec_to_ns(). This works out to ~292 years. + TODO: consider using doubles instead of int64. */ +static int64_t max_seconds = (GPR_INT64_MAX - GPR_NS_PER_SEC) / GPR_NS_PER_SEC; + +static int64_t timespec_to_ns(const gpr_timespec ts) { + if (ts.tv_sec > max_seconds) { + return GPR_INT64_MAX - 1; + } + return ts.tv_sec * GPR_NS_PER_SEC + ts.tv_nsec; +} + +static void cws_initialize_statistic(void *statistic, + const cws_stat_info *stat_info) { + if (stat_info->stat_initialize == NULL) { + memset(statistic, 0, stat_info->stat_size); + } else { + stat_info->stat_initialize(statistic); + } +} + +/* Create and initialize a statistic */ +static void *cws_create_statistic(const cws_stat_info *stat_info) { + void *stat = gpr_malloc(stat_info->stat_size); + cws_initialize_statistic(stat, stat_info); + return stat; +} + +window_stats *census_window_stats_create(int nintervals, + const gpr_timespec intervals[], + int granularity, + const cws_stat_info *stat_info) { + window_stats *ret; + int i; + /* validate inputs */ + GPR_ASSERT(nintervals > 0 && granularity > 2 && intervals != NULL && + stat_info != NULL); + for (i = 0; i < nintervals; i++) { + int64_t ns = timespec_to_ns(intervals[i]); + GPR_ASSERT(intervals[i].tv_sec >= 0 && intervals[i].tv_nsec >= 0 && + intervals[i].tv_nsec < GPR_NS_PER_SEC && ns >= 100 && + granularity * 10 <= ns); + } + /* Allocate and initialize relevant data structures */ + ret = (window_stats *)gpr_malloc(sizeof(window_stats)); + ret->nintervals = nintervals; + ret->nbuckets = granularity + 1; + ret->stat_info = *stat_info; + ret->interval_stats = + (cws_interval_stats *)gpr_malloc(nintervals * sizeof(cws_interval_stats)); + for (i = 0; i < nintervals; i++) { + int64_t size_ns = timespec_to_ns(intervals[i]); + cws_interval_stats *is = ret->interval_stats + i; + cws_bucket *buckets = is->buckets = + (cws_bucket *)gpr_malloc(ret->nbuckets * sizeof(cws_bucket)); + int b; + for (b = 0; b < ret->nbuckets; b++) { + buckets[b].statistic = cws_create_statistic(stat_info); + buckets[b].count = 0; + } + is->bottom_bucket = 0; + is->bottom = 0; + is->width = size_ns / granularity; + /* Check for possible overflow issues, and maximize interval size if the + user requested something large enough. */ + if ((GPR_INT64_MAX - is->width) > size_ns) { + is->top = size_ns + is->width; + } else { + is->top = GPR_INT64_MAX; + is->width = GPR_INT64_MAX / (granularity + 1); + } + /* If size doesn't divide evenly, we can have a width slightly too small; + better to have it slightly large. */ + if ((size_ns - (granularity + 1) * is->width) > 0) { + is->width += 1; + } + } + ret->newest_time = 0; + return ret; +} + +/* When we try adding a measurement above the current interval range, we + need to "shift" the buckets sufficiently to cover the new range. */ +static void cws_shift_buckets(const window_stats *wstats, + cws_interval_stats *is, int64_t when_ns) { + int i; + /* number of bucket time widths to "shift" */ + int shift; + /* number of buckets to clear */ + int nclear; + GPR_ASSERT(when_ns >= is->top); + /* number of bucket time widths to "shift" */ + shift = ((when_ns - is->top) / is->width) + 1; + /* number of buckets to clear - limited by actual number of buckets */ + nclear = GPR_MIN(shift, wstats->nbuckets); + for (i = 0; i < nclear; i++) { + int b = BUCKET_IDX(is, i, wstats); + is->buckets[b].count = 0; + cws_initialize_statistic(is->buckets[b].statistic, &wstats->stat_info); + } + /* adjust top/bottom times and current bottom bucket */ + is->bottom_bucket = BUCKET_IDX(is, shift, wstats); + is->top += shift * is->width; + is->bottom += shift * is->width; +} + +void census_window_stats_add(window_stats *wstats, const gpr_timespec when, + const void *stat_value) { + int i; + int64_t when_ns = timespec_to_ns(when); + GPR_ASSERT(wstats->interval_stats != NULL); + for (i = 0; i < wstats->nintervals; i++) { + cws_interval_stats *is = wstats->interval_stats + i; + cws_bucket *bucket; + if (when_ns < is->bottom) { /* Below smallest time in interval: drop */ + continue; + } + if (when_ns >= is->top) { /* above limit: shift buckets */ + cws_shift_buckets(wstats, is, when_ns); + } + /* Add the stat. */ + GPR_ASSERT(is->bottom <= when_ns && when_ns < is->top); + bucket = is->buckets + + BUCKET_IDX(is, (when_ns - is->bottom) / is->width, wstats); + bucket->count++; + wstats->stat_info.stat_add(bucket->statistic, stat_value); + } + if (when_ns > wstats->newest_time) { + wstats->newest_time = when_ns; + } +} + +/* Add a specific bucket contents to an accumulating total. */ +static void cws_add_bucket_to_sum(cws_sum *sum, const cws_bucket *bucket, + const cws_stat_info *stat_info) { + sum->count += bucket->count; + stat_info->stat_add(sum->statistic, bucket->statistic); +} + +/* Add a proportion to an accumulating sum. */ +static void cws_add_proportion_to_sum(double p, cws_sum *sum, + const cws_bucket *bucket, + const cws_stat_info *stat_info) { + sum->count += p * bucket->count; + stat_info->stat_add_proportion(p, sum->statistic, bucket->statistic); +} + +void census_window_stats_get_sums(const window_stats *wstats, + const gpr_timespec when, cws_sum sums[]) { + int i; + int64_t when_ns = timespec_to_ns(when); + GPR_ASSERT(wstats->interval_stats != NULL); + for (i = 0; i < wstats->nintervals; i++) { + int when_bucket; + int new_bucket; + double last_proportion = 1.0; + double bottom_proportion; + cws_interval_stats *is = wstats->interval_stats + i; + cws_sum *sum = sums + i; + sum->count = 0; + cws_initialize_statistic(sum->statistic, &wstats->stat_info); + if (when_ns < is->bottom) { + continue; + } + if (when_ns >= is->top) { + cws_shift_buckets(wstats, is, when_ns); + } + /* Calculating the appropriate amount of which buckets to use can get + complicated. Essentially there are two cases: + 1) if the "top" bucket (new_bucket, where the newest additions to the + stats recorded are entered) corresponds to 'when', then we need + to take a proportion of it - (if when < newest_time) or the full + thing. We also (possibly) need to take a corresponding + proportion of the bottom bucket. + 2) Other cases, we just take a straight proportion. + */ + when_bucket = (when_ns - is->bottom) / is->width; + new_bucket = (wstats->newest_time - is->bottom) / is->width; + if (new_bucket == when_bucket) { + int64_t bottom_bucket_time = is->bottom + when_bucket * is->width; + if (when_ns < wstats->newest_time) { + last_proportion = (double)(when_ns - bottom_bucket_time) / + (double)(wstats->newest_time - bottom_bucket_time); + bottom_proportion = + (double)(is->width - (when_ns - bottom_bucket_time)) / is->width; + } else { + bottom_proportion = + (double)(is->width - (wstats->newest_time - bottom_bucket_time)) / + is->width; + } + } else { + last_proportion = + (double)(when_ns + 1 - is->bottom - when_bucket * is->width) / + is->width; + bottom_proportion = 1.0 - last_proportion; + } + cws_add_proportion_to_sum(last_proportion, sum, + is->buckets + BUCKET_IDX(is, when_bucket, wstats), + &wstats->stat_info); + if (when_bucket != 0) { /* last bucket isn't also bottom bucket */ + int b; + /* Add all of "bottom" bucket if we are looking at a subset of the + full interval, or a proportion if we are adding full interval. */ + cws_add_proportion_to_sum( + (when_bucket == wstats->nbuckets - 1 ? bottom_proportion : 1.0), sum, + is->buckets + is->bottom_bucket, &wstats->stat_info); + /* Add all the remaining buckets (everything but top and bottom). */ + for (b = 1; b < when_bucket; b++) { + cws_add_bucket_to_sum(sum, is->buckets + BUCKET_IDX(is, b, wstats), + &wstats->stat_info); + } + } + } +} + +void census_window_stats_destroy(window_stats *wstats) { + int i; + GPR_ASSERT(wstats->interval_stats != NULL); + for (i = 0; i < wstats->nintervals; i++) { + int b; + for (b = 0; b < wstats->nbuckets; b++) { + gpr_free(wstats->interval_stats[i].buckets[b].statistic); + } + gpr_free(wstats->interval_stats[i].buckets); + } + gpr_free(wstats->interval_stats); + /* Ensure any use-after free triggers assert. */ + wstats->interval_stats = NULL; + gpr_free(wstats); +} diff --git a/src/core/ext/census/window_stats.h b/src/core/ext/census/window_stats.h new file mode 100644 index 0000000000..25658c9ce0 --- /dev/null +++ b/src/core/ext/census/window_stats.h @@ -0,0 +1,173 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CENSUS_WINDOW_STATS_H +#define GRPC_CORE_EXT_CENSUS_WINDOW_STATS_H + +#include <grpc/support/time.h> + +/* Keep rolling sums of a user-defined statistic (containing a number of + measurements) over a a number of time intervals ("windows"). For example, + you can use a window_stats object to answer questions such as + "Approximately how many RPCs/s did I receive over the past minute, and + approximately how many bytes did I send out over that period?". + + The type of data to record, and the time intervals to keep are specified + when creating the object via a call to census_window_stats_create(). + + A window's interval is divided into one or more "buckets"; the interval + must be divisible by the number of buckets. Internally, these buckets + control the granularity of window_stats' measurements. Increasing the + number of buckets lets the object respond more quickly to changes in the + overall rate of data added into the object, at the cost of additional + memory usage. + + Here's some code which keeps one minute/hour measurements for two values + (latency in seconds and bytes transferred), with each interval divided into + 4 buckets. + + typedef struct my_stat { + double latency; + int bytes; + } my_stat; + + void add_my_stat(void* base, const void* addme) { + my_stat* b = (my_stat*)base; + const my_stat* a = (const my_stat*)addme; + b->latency += a->latency; + b->bytes += a->bytes; + } + + void add_proportion_my_stat(double p, void* base, const void* addme) { + (my_stat*)result->latency += p * (const my_stat*)base->latency; + (my_stat*)result->bytes += p * (const my_stat*)base->bytes; + } + + #define kNumIntervals 2 + #define kMinInterval 0 + #define kHourInterval 1 + #define kNumBuckets 4 + + const struct census_window_stats_stat_info kMyStatInfo + = { sizeof(my_stat), NULL, add_my_stat, add_proportion_my_stat }; + gpr_timespec intervals[kNumIntervals] = {{60, 0}, {3600, 0}}; + my_stat stat; + my_stat sums[kNumIntervals]; + census_window_stats_sums result[kNumIntervals]; + struct census_window_stats* stats + = census_window_stats_create(kNumIntervals, intervals, kNumBuckets, + &kMyStatInfo); + // Record a new event, taking 15.3ms, transferring 1784 bytes. + stat.latency = 0.153; + stat.bytes = 1784; + census_window_stats_add(stats, gpr_now(GPR_CLOCK_REALTIME), &stat); + // Get sums and print them out + result[kMinInterval].statistic = &sums[kMinInterval]; + result[kHourInterval].statistic = &sums[kHourInterval]; + census_window_stats_get_sums(stats, gpr_now(GPR_CLOCK_REALTIME), result); + printf("%d events/min, average time %gs, average bytes %g\n", + result[kMinInterval].count, + (my_stat*)result[kMinInterval].statistic->latency / + result[kMinInterval].count, + (my_stat*)result[kMinInterval].statistic->bytes / + result[kMinInterval].count + ); + printf("%d events/hr, average time %gs, average bytes %g\n", + result[kHourInterval].count, + (my_stat*)result[kHourInterval].statistic->latency / + result[kHourInterval].count, + (my_stat*)result[kHourInterval].statistic->bytes / + result[kHourInterval].count + ); +*/ + +/* Opaque structure for representing window_stats object */ +struct census_window_stats; + +/* Information provided by API user on the information they want to record */ +typedef struct census_window_stats_stat_info { + /* Number of bytes in user-defined object. */ + size_t stat_size; + /* Function to initialize a user-defined statistics object. If this is set + * to NULL, then the object will be zero-initialized. */ + void (*stat_initialize)(void *stat); + /* Function to add one user-defined statistics object ('addme') to 'base' */ + void (*stat_add)(void *base, const void *addme); + /* As for previous function, but only add a proportion 'p'. This API will + currently only use 'p' values in the range [0,1], but other values are + possible in the future, and should be supported. */ + void (*stat_add_proportion)(double p, void *base, const void *addme); +} census_window_stats_stat_info; + +/* Create a new window_stats object. 'nintervals' is the number of + 'intervals', and must be >=1. 'granularity' is the number of buckets, with + a larger number using more memory, but providing greater accuracy of + results. 'granularity should be > 2. We also require that each interval be + at least 10 * 'granularity' nanoseconds in size. 'stat_info' contains + information about the statistic to be gathered. Intervals greater than ~192 + years will be treated as essentially infinite in size. This function will + GPR_ASSERT() if the object cannot be created or any of the parameters have + invalid values. This function is thread-safe. */ +struct census_window_stats *census_window_stats_create( + int nintervals, const gpr_timespec intervals[], int granularity, + const census_window_stats_stat_info *stat_info); + +/* Add a new measurement (in 'stat_value'), as of a given time ('when'). + This function is thread-compatible. */ +void census_window_stats_add(struct census_window_stats *wstats, + const gpr_timespec when, const void *stat_value); + +/* Structure used to record a single intervals sum for a given statistic */ +typedef struct census_window_stats_sum { + /* Total count of samples. Note that because some internal interpolation + is performed, the count of samples returned for each interval may not be an + integral value. */ + double count; + /* Sum for statistic */ + void *statistic; +} census_window_stats_sums; + +/* Retrieve a set of all values stored in a window_stats object 'wstats'. The + number of 'sums' MUST be the same as the number 'nintervals' used in + census_window_stats_create(). This function is thread-compatible. */ +void census_window_stats_get_sums(const struct census_window_stats *wstats, + const gpr_timespec when, + struct census_window_stats_sum sums[]); + +/* Destroy a window_stats object. Once this function has been called, the + object will no longer be usable from any of the above functions (and + calling them will most likely result in a NULL-pointer dereference or + assertion failure). This function is thread-compatible. */ +void census_window_stats_destroy(struct census_window_stats *wstats); + +#endif /* GRPC_CORE_EXT_CENSUS_WINDOW_STATS_H */ diff --git a/src/core/ext/resolver/dns/native/README.md b/src/core/ext/resolver/dns/native/README.md new file mode 100644 index 0000000000..695de47b9f --- /dev/null +++ b/src/core/ext/resolver/dns/native/README.md @@ -0,0 +1,2 @@ +dns: scheme name resolution, using getaddrbyname +(or other OS specific implementation) diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c new file mode 100644 index 0000000000..70d8a3fe2d --- /dev/null +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -0,0 +1,299 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/client_config/lb_policy_registry.h" +#include "src/core/lib/client_config/resolver_registry.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/backoff.h" +#include "src/core/lib/support/string.h" + +#define BACKOFF_MULTIPLIER 1.6 +#define BACKOFF_JITTER 0.2 +#define BACKOFF_MIN_SECONDS 1 +#define BACKOFF_MAX_SECONDS 120 + +typedef struct { + /** base class: must be first */ + grpc_resolver base; + /** refcount */ + gpr_refcount refs; + /** name to resolve */ + char *name; + /** default port to use */ + char *default_port; + /** subchannel factory */ + grpc_subchannel_factory *subchannel_factory; + /** load balancing policy name */ + char *lb_policy_name; + + /** mutex guarding the rest of the state */ + gpr_mu mu; + /** are we currently resolving? */ + int resolving; + /** which version of resolved_config have we published? */ + int published_version; + /** which version of resolved_config is current? */ + int resolved_version; + /** pending next completion, or NULL */ + grpc_closure *next_completion; + /** target config address for next completion */ + grpc_client_config **target_config; + /** current (fully resolved) config */ + grpc_client_config *resolved_config; + /** retry timer */ + bool have_retry_timer; + grpc_timer retry_timer; + /** retry backoff state */ + gpr_backoff backoff_state; +} dns_resolver; + +static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); + +static void dns_start_resolving_locked(dns_resolver *r); +static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r); + +static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_client_config **target_config, + grpc_closure *on_complete); + +static const grpc_resolver_vtable dns_resolver_vtable = { + dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; + +static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { + dns_resolver *r = (dns_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->have_retry_timer) { + grpc_timer_cancel(exec_ctx, &r->retry_timer); + } + if (r->next_completion != NULL) { + *r->target_config = NULL; + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); + r->next_completion = NULL; + } + gpr_mu_unlock(&r->mu); +} + +static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + dns_resolver *r = (dns_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (!r->resolving) { + gpr_backoff_reset(&r->backoff_state); + dns_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); +} + +static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_closure *on_complete) { + dns_resolver *r = (dns_resolver *)resolver; + gpr_mu_lock(&r->mu); + GPR_ASSERT(!r->next_completion); + r->next_completion = on_complete; + r->target_config = target_config; + if (r->resolved_version == 0 && !r->resolving) { + gpr_backoff_reset(&r->backoff_state); + dns_start_resolving_locked(r); + } else { + dns_maybe_finish_next_locked(exec_ctx, r); + } + gpr_mu_unlock(&r->mu); +} + +static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, + bool success) { + dns_resolver *r = arg; + + gpr_mu_lock(&r->mu); + r->have_retry_timer = false; + if (success) { + if (!r->resolving) { + dns_start_resolving_locked(r); + } + } + gpr_mu_unlock(&r->mu); + + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); +} + +static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, + grpc_resolved_addresses *addresses) { + dns_resolver *r = arg; + grpc_client_config *config = NULL; + grpc_lb_policy *lb_policy; + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving); + r->resolving = 0; + if (addresses != NULL) { + grpc_lb_policy_args lb_policy_args; + config = grpc_client_config_create(); + memset(&lb_policy_args, 0, sizeof(lb_policy_args)); + lb_policy_args.addresses = addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + if (lb_policy != NULL) { + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + } + grpc_resolved_addresses_destroy(addresses); + } else { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); + gpr_timespec timeout = gpr_time_sub(next_try, now); + gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d.%09d seconds", + timeout.tv_sec, timeout.tv_nsec); + GPR_ASSERT(!r->have_retry_timer); + r->have_retry_timer = true; + GRPC_RESOLVER_REF(&r->base, "retry-timer"); + grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r, + now); + } + if (r->resolved_config) { + grpc_client_config_unref(exec_ctx, r->resolved_config); + } + r->resolved_config = config; + r->resolved_version++; + dns_maybe_finish_next_locked(exec_ctx, r); + gpr_mu_unlock(&r->mu); + + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); +} + +static void dns_start_resolving_locked(dns_resolver *r) { + GRPC_RESOLVER_REF(&r->base, "dns-resolving"); + GPR_ASSERT(!r->resolving); + r->resolving = 1; + grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); +} + +static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r) { + if (r->next_completion != NULL && + r->resolved_version != r->published_version) { + *r->target_config = r->resolved_config; + if (r->resolved_config) { + grpc_client_config_ref(r->resolved_config); + } + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); + r->next_completion = NULL; + r->published_version = r->resolved_version; + } +} + +static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { + dns_resolver *r = (dns_resolver *)gr; + gpr_mu_destroy(&r->mu); + if (r->resolved_config) { + grpc_client_config_unref(exec_ctx, r->resolved_config); + } + grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); + gpr_free(r->name); + gpr_free(r->default_port); + gpr_free(r->lb_policy_name); + gpr_free(r); +} + +static grpc_resolver *dns_create(grpc_resolver_args *args, + const char *default_port, + const char *lb_policy_name) { + dns_resolver *r; + const char *path = args->uri->path; + + if (0 != strcmp(args->uri->authority, "")) { + gpr_log(GPR_ERROR, "authority based dns uri's not supported"); + return NULL; + } + + if (path[0] == '/') ++path; + + r = gpr_malloc(sizeof(dns_resolver)); + memset(r, 0, sizeof(*r)); + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &dns_resolver_vtable); + r->name = gpr_strdup(path); + r->default_port = gpr_strdup(default_port); + r->subchannel_factory = args->subchannel_factory; + gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, + BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); + grpc_subchannel_factory_ref(r->subchannel_factory); + r->lb_policy_name = gpr_strdup(lb_policy_name); + return &r->base; +} + +/* + * FACTORY + */ + +static void dns_factory_ref(grpc_resolver_factory *factory) {} + +static void dns_factory_unref(grpc_resolver_factory *factory) {} + +static grpc_resolver *dns_factory_create_resolver( + grpc_resolver_factory *factory, grpc_resolver_args *args) { + return dns_create(args, "https", "pick_first"); +} + +static char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, + grpc_uri *uri) { + const char *path = uri->path; + if (path[0] == '/') ++path; + return gpr_strdup(path); +} + +static const grpc_resolver_factory_vtable dns_factory_vtable = { + dns_factory_ref, dns_factory_unref, dns_factory_create_resolver, + dns_factory_get_default_host_name, "dns"}; +static grpc_resolver_factory dns_resolver_factory = {&dns_factory_vtable}; + +static grpc_resolver_factory *dns_resolver_factory_create() { + return &dns_resolver_factory; +} + +void grpc_resolver_dns_native_init(void) { + grpc_register_resolver_type(dns_resolver_factory_create()); +} + +void grpc_resolver_dns_native_shutdown(void) {} diff --git a/src/core/ext/resolver/sockaddr/README.md b/src/core/ext/resolver/sockaddr/README.md new file mode 100644 index 0000000000..e307ba88f5 --- /dev/null +++ b/src/core/ext/resolver/sockaddr/README.md @@ -0,0 +1 @@ +Support for resolving ipv4:, ipv6:, unix: schemes diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c new file mode 100644 index 0000000000..69595ca3db --- /dev/null +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -0,0 +1,361 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/client_config/lb_policy_registry.h" +#include "src/core/lib/client_config/resolver_registry.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/unix_sockets_posix.h" +#include "src/core/lib/support/string.h" + +typedef struct { + /** base class: must be first */ + grpc_resolver base; + /** refcount */ + gpr_refcount refs; + /** subchannel factory */ + grpc_subchannel_factory *subchannel_factory; + /** load balancing policy name */ + char *lb_policy_name; + + /** the addresses that we've 'resolved' */ + grpc_resolved_addresses *addresses; + + /** mutex guarding the rest of the state */ + gpr_mu mu; + /** have we published? */ + int published; + /** pending next completion, or NULL */ + grpc_closure *next_completion; + /** target config address for next completion */ + grpc_client_config **target_config; +} sockaddr_resolver; + +static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); + +static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + sockaddr_resolver *r); + +static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_client_config **target_config, + grpc_closure *on_complete); + +static const grpc_resolver_vtable sockaddr_resolver_vtable = { + sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, + sockaddr_next}; + +static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + sockaddr_resolver *r = (sockaddr_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->next_completion != NULL) { + *r->target_config = NULL; + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); + r->next_completion = NULL; + } + gpr_mu_unlock(&r->mu); +} + +static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + sockaddr_resolver *r = (sockaddr_resolver *)resolver; + gpr_mu_lock(&r->mu); + r->published = 0; + sockaddr_maybe_finish_next_locked(exec_ctx, r); + gpr_mu_unlock(&r->mu); +} + +static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_closure *on_complete) { + sockaddr_resolver *r = (sockaddr_resolver *)resolver; + gpr_mu_lock(&r->mu); + GPR_ASSERT(!r->next_completion); + r->next_completion = on_complete; + r->target_config = target_config; + sockaddr_maybe_finish_next_locked(exec_ctx, r); + gpr_mu_unlock(&r->mu); +} + +static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + sockaddr_resolver *r) { + if (r->next_completion != NULL && !r->published) { + grpc_client_config *cfg = grpc_client_config_create(); + grpc_lb_policy_args lb_policy_args; + memset(&lb_policy_args, 0, sizeof(lb_policy_args)); + lb_policy_args.addresses = r->addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + grpc_lb_policy *lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + grpc_client_config_set_lb_policy(cfg, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); + r->published = 1; + *r->target_config = cfg; + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); + r->next_completion = NULL; + } +} + +static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { + sockaddr_resolver *r = (sockaddr_resolver *)gr; + gpr_mu_destroy(&r->mu); + grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); + grpc_resolved_addresses_destroy(r->addresses); + gpr_free(r->lb_policy_name); + gpr_free(r); +} + +static char *ip_get_default_authority(grpc_uri *uri) { + const char *path = uri->path; + if (path[0] == '/') ++path; + return gpr_strdup(path); +} + +static char *ipv4_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return ip_get_default_authority(uri); +} + +static char *ipv6_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return ip_get_default_authority(uri); +} + +static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { + const char *host_port = uri->path; + char *host; + char *port; + int port_num; + int result = 0; + struct sockaddr_in *in = (struct sockaddr_in *)addr; + + if (*host_port == '/') ++host_port; + if (!gpr_split_host_port(host_port, &host, &port)) { + return 0; + } + + memset(in, 0, sizeof(*in)); + *len = sizeof(*in); + in->sin_family = AF_INET; + if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); + goto done; + } + + if (port != NULL) { + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || + port_num > 65535) { + gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); + goto done; + } + in->sin_port = htons((uint16_t)port_num); + } else { + gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); + goto done; + } + + result = 1; +done: + gpr_free(host); + gpr_free(port); + return result; +} + +static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { + const char *host_port = uri->path; + char *host; + char *port; + int port_num; + int result = 0; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr; + + if (*host_port == '/') ++host_port; + if (!gpr_split_host_port(host_port, &host, &port)) { + return 0; + } + + memset(in6, 0, sizeof(*in6)); + *len = sizeof(*in6); + in6->sin6_family = AF_INET6; + if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); + goto done; + } + + if (port != NULL) { + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || + port_num > 65535) { + gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); + goto done; + } + in6->sin6_port = htons((uint16_t)port_num); + } else { + gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); + goto done; + } + + result = 1; +done: + gpr_free(host); + gpr_free(port); + return result; +} + +static void do_nothing(void *ignored) {} + +static grpc_resolver *sockaddr_create( + grpc_resolver_args *args, const char *default_lb_policy_name, + int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { + int errors_found = 0; /* GPR_FALSE */ + sockaddr_resolver *r; + gpr_slice path_slice; + gpr_slice_buffer path_parts; + + if (0 != strcmp(args->uri->authority, "")) { + gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme", + args->uri->scheme); + return NULL; + } + + r = gpr_malloc(sizeof(sockaddr_resolver)); + memset(r, 0, sizeof(*r)); + + r->lb_policy_name = NULL; + if (0 != strcmp(args->uri->query, "")) { + gpr_slice query_slice; + gpr_slice_buffer query_parts; + + query_slice = + gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing); + gpr_slice_buffer_init(&query_parts); + gpr_slice_split(query_slice, "=", &query_parts); + GPR_ASSERT(query_parts.count == 2); + if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { + r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); + } + gpr_slice_buffer_destroy(&query_parts); + gpr_slice_unref(query_slice); + } + if (r->lb_policy_name == NULL) { + r->lb_policy_name = gpr_strdup(default_lb_policy_name); + } + + path_slice = + gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); + gpr_slice_buffer_init(&path_parts); + + gpr_slice_split(path_slice, ",", &path_parts); + r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->addresses->naddrs = path_parts.count; + r->addresses->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs); + + for (size_t i = 0; i < r->addresses->naddrs; i++) { + grpc_uri ith_uri = *args->uri; + char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); + ith_uri.path = part_str; + if (!parse(&ith_uri, + (struct sockaddr_storage *)(&r->addresses->addrs[i].addr), + &r->addresses->addrs[i].len)) { + errors_found = 1; /* GPR_TRUE */ + } + gpr_free(part_str); + if (errors_found) break; + } + + gpr_slice_buffer_destroy(&path_parts); + gpr_slice_unref(path_slice); + if (errors_found) { + gpr_free(r->lb_policy_name); + grpc_resolved_addresses_destroy(r->addresses); + gpr_free(r); + return NULL; + } + + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); + r->subchannel_factory = args->subchannel_factory; + grpc_subchannel_factory_ref(r->subchannel_factory); + + return &r->base; +} + +/* + * FACTORY + */ + +static void sockaddr_factory_ref(grpc_resolver_factory *factory) {} + +static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} + +#define DECL_FACTORY(name, prefix) \ + static grpc_resolver *name##_factory_create_resolver( \ + grpc_resolver_factory *factory, grpc_resolver_args *args) { \ + return sockaddr_create(args, "pick_first", prefix##parse_##name); \ + } \ + static const grpc_resolver_factory_vtable name##_factory_vtable = { \ + sockaddr_factory_ref, sockaddr_factory_unref, \ + name##_factory_create_resolver, prefix##name##_get_default_authority, \ + #name}; \ + static grpc_resolver_factory name##_resolver_factory = { \ + &name##_factory_vtable} + +#ifdef GPR_HAVE_UNIX_SOCKET +DECL_FACTORY(unix, grpc_); +#endif +DECL_FACTORY(ipv4, ); +DECL_FACTORY(ipv6, ); + +void grpc_resolver_sockaddr_init(void) { + grpc_register_resolver_type(&ipv4_resolver_factory); + grpc_register_resolver_type(&ipv6_resolver_factory); +#ifdef GPR_HAVE_UNIX_SOCKET + grpc_register_resolver_type(&unix_resolver_factory); +#endif +} + +void grpc_resolver_sockaddr_shutdown(void) {} diff --git a/src/core/ext/resolver/zookeeper/README.md b/src/core/ext/resolver/zookeeper/README.md new file mode 100644 index 0000000000..ce6f39683b --- /dev/null +++ b/src/core/ext/resolver/zookeeper/README.md @@ -0,0 +1 @@ +Zookeeper based name resolver: WIP diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c new file mode 100644 index 0000000000..5acb0940c6 --- /dev/null +++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c @@ -0,0 +1,512 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include <grpc/grpc_zookeeper.h> +#include <zookeeper/zookeeper.h> + +#include "src/core/lib/client_config/lb_policy_registry.h" +#include "src/core/lib/client_config/resolver_registry.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/json/json.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/api_trace.h" + +/** Zookeeper session expiration time in milliseconds */ +#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000 + +typedef struct { + /** base class: must be first */ + grpc_resolver base; + /** refcount */ + gpr_refcount refs; + /** name to resolve */ + char *name; + /** subchannel factory */ + grpc_subchannel_factory *subchannel_factory; + /** load balancing policy name */ + char *lb_policy_name; + + /** mutex guarding the rest of the state */ + gpr_mu mu; + /** are we currently resolving? */ + int resolving; + /** which version of resolved_config have we published? */ + int published_version; + /** which version of resolved_config is current? */ + int resolved_version; + /** pending next completion, or NULL */ + grpc_closure *next_completion; + /** target config address for next completion */ + grpc_client_config **target_config; + /** current (fully resolved) config */ + grpc_client_config *resolved_config; + + /** zookeeper handle */ + zhandle_t *zookeeper_handle; + /** zookeeper resolved addresses */ + grpc_resolved_addresses *resolved_addrs; + /** total number of addresses to be resolved */ + int resolved_total; + /** number of addresses resolved */ + int resolved_num; +} zookeeper_resolver; + +static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); + +static void zookeeper_start_resolving_locked(zookeeper_resolver *r); +static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + zookeeper_resolver *r); + +static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_client_config **target_config, + grpc_closure *on_complete); + +static const grpc_resolver_vtable zookeeper_resolver_vtable = { + zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, + zookeeper_next}; + +static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + grpc_closure *call = NULL; + gpr_mu_lock(&r->mu); + if (r->next_completion != NULL) { + *r->target_config = NULL; + call = r->next_completion; + r->next_completion = NULL; + } + zookeeper_close(r->zookeeper_handle); + gpr_mu_unlock(&r->mu); + if (call != NULL) { + call->cb(exec_ctx, call->cb_arg, 1); + } +} + +static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); +} + +static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_closure *on_complete) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->next_completion == NULL); + r->next_completion = on_complete; + r->target_config = target_config; + if (r->resolved_version == 0 && r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } else { + zookeeper_maybe_finish_next_locked(exec_ctx, r); + } + gpr_mu_unlock(&r->mu); +} + +/** Zookeeper global watcher for connection management + TODO: better connection management besides logs */ +static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, + int state, const char *path, + void *watcher_ctx) { + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_EXPIRED_SESSION_STATE) { + gpr_log(GPR_ERROR, "Zookeeper session expired"); + } else if (state == ZOO_AUTH_FAILED_STATE) { + gpr_log(GPR_ERROR, "Zookeeper authentication failed"); + } + } +} + +/** Zookeeper watcher triggered by changes to watched nodes + Once triggered, it tries to resolve again to get updated addresses */ +static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, + const char *path, void *watcher_ctx) { + if (watcher_ctx != NULL) { + zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx; + if (state == ZOO_CONNECTED_STATE) { + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); + } + } +} + +/** Callback function after getting all resolved addresses + Creates a subchannel for each address */ +static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, + grpc_resolved_addresses *addresses) { + zookeeper_resolver *r = arg; + grpc_client_config *config = NULL; + grpc_lb_policy *lb_policy; + + if (addresses != NULL) { + grpc_lb_policy_args lb_policy_args; + config = grpc_client_config_create(); + + lb_policy_args.addresses = addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + + if (lb_policy != NULL) { + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + } + grpc_resolved_addresses_destroy(addresses); + } + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving == 1); + r->resolving = 0; + if (r->resolved_config != NULL) { + grpc_client_config_unref(exec_ctx, r->resolved_config); + } + r->resolved_config = config; + r->resolved_version++; + zookeeper_maybe_finish_next_locked(exec_ctx, r); + gpr_mu_unlock(&r->mu); + + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "zookeeper-resolving"); +} + +/** Callback function for each DNS resolved address */ +static void zookeeper_dns_resolved(grpc_exec_ctx *exec_ctx, void *arg, + grpc_resolved_addresses *addresses) { + size_t i; + zookeeper_resolver *r = arg; + int resolve_done = 0; + + gpr_mu_lock(&r->mu); + r->resolved_num++; + r->resolved_addrs->addrs = + gpr_realloc(r->resolved_addrs->addrs, + sizeof(grpc_resolved_address) * + (r->resolved_addrs->naddrs + addresses->naddrs)); + for (i = 0; i < addresses->naddrs; i++) { + memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, + addresses->addrs[i].addr, addresses->addrs[i].len); + r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = + addresses->addrs[i].len; + } + + r->resolved_addrs->naddrs += addresses->naddrs; + grpc_resolved_addresses_destroy(addresses); + + /** Wait for all addresses to be resolved */ + resolve_done = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_done) { + zookeeper_on_resolved(exec_ctx, r, r->resolved_addrs); + } +} + +/** Parses JSON format address of a zookeeper node */ +static char *zookeeper_parse_address(const char *value, size_t value_len) { + grpc_json *json; + grpc_json *cur; + const char *host; + const char *port; + char *buffer; + char *address = NULL; + + buffer = gpr_malloc(value_len); + memcpy(buffer, value, value_len); + json = grpc_json_parse_string_with_len(buffer, value_len); + if (json != NULL) { + host = NULL; + port = NULL; + for (cur = json->child; cur != NULL; cur = cur->next) { + if (!strcmp(cur->key, "host")) { + host = cur->value; + if (port != NULL) { + break; + } + } else if (!strcmp(cur->key, "port")) { + port = cur->value; + if (host != NULL) { + break; + } + } + } + if (host != NULL && port != NULL) { + gpr_asprintf(&address, "%s:%s", host, port); + } + grpc_json_destroy(json); + } + gpr_free(buffer); + + return address; +} + +static void zookeeper_get_children_node_completion(int rc, const char *value, + int value_len, + const struct Stat *stat, + const void *arg) { + char *address = NULL; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + int resolve_done = 0; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name); + grpc_exec_ctx_finish(&exec_ctx); + return; + } + + address = zookeeper_parse_address(value, (size_t)value_len); + if (address != NULL) { + /** Further resolves address by DNS */ + grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + gpr_free(address); + } else { + gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name); + gpr_mu_lock(&r->mu); + r->resolved_total--; + resolve_done = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_done) { + zookeeper_on_resolved(&exec_ctx, r, r->resolved_addrs); + } + } + + grpc_exec_ctx_finish(&exec_ctx); +} + +static void zookeeper_get_children_completion( + int rc, const struct String_vector *children, const void *arg) { + char *path; + int status; + int i; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + return; + } + + if (children->count == 0) { + gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name); + return; + } + + r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->resolved_addrs->addrs = NULL; + r->resolved_addrs->naddrs = 0; + r->resolved_total = children->count; + + /** TODO: Replace expensive heap allocation with stack + if we can get maximum length of zookeeper path */ + for (i = 0; i < children->count; i++) { + gpr_asprintf(&path, "%s/%s", r->name, children->data[i]); + status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r, + zookeeper_get_children_node_completion, r); + gpr_free(path); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path); + } + } +} + +static void zookeeper_get_node_completion(int rc, const char *value, + int value_len, + const struct Stat *stat, + const void *arg) { + int status; + char *address = NULL; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + r->resolved_addrs = NULL; + r->resolved_total = 0; + r->resolved_num = 0; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + return; + } + + /** If zookeeper node of path r->name does not have address + (i.e. service node), get its children */ + address = zookeeper_parse_address(value, (size_t)value_len); + if (address != NULL) { + r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->resolved_addrs->addrs = NULL; + r->resolved_addrs->naddrs = 0; + r->resolved_total = 1; + /** Further resolves address by DNS */ + grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + gpr_free(address); + return; + } + + status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher, + r, zookeeper_get_children_completion, r); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + } +} + +static void zookeeper_resolve_address(zookeeper_resolver *r) { + int status; + status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r, + zookeeper_get_node_completion, r); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + } +} + +static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { + GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving"); + GPR_ASSERT(r->resolving == 0); + r->resolving = 1; + zookeeper_resolve_address(r); +} + +static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + zookeeper_resolver *r) { + if (r->next_completion != NULL && + r->resolved_version != r->published_version) { + *r->target_config = r->resolved_config; + if (r->resolved_config != NULL) { + grpc_client_config_ref(r->resolved_config); + } + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); + r->next_completion = NULL; + r->published_version = r->resolved_version; + } +} + +static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { + zookeeper_resolver *r = (zookeeper_resolver *)gr; + gpr_mu_destroy(&r->mu); + if (r->resolved_config != NULL) { + grpc_client_config_unref(exec_ctx, r->resolved_config); + } + grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); + gpr_free(r->name); + gpr_free(r->lb_policy_name); + gpr_free(r); +} + +static grpc_resolver *zookeeper_create(grpc_resolver_args *args, + const char *lb_policy_name) { + zookeeper_resolver *r; + size_t length; + char *path = args->uri->path; + + if (0 == strcmp(args->uri->authority, "")) { + gpr_log(GPR_ERROR, "No authority specified in zookeeper uri"); + return NULL; + } + + /** Removes the trailing slash if exists */ + length = strlen(path); + if (length > 1 && path[length - 1] == '/') { + path[length - 1] = 0; + } + + r = gpr_malloc(sizeof(zookeeper_resolver)); + memset(r, 0, sizeof(*r)); + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); + r->name = gpr_strdup(path); + + r->subchannel_factory = args->subchannel_factory; + grpc_subchannel_factory_ref(r->subchannel_factory); + + r->lb_policy_name = gpr_strdup(lb_policy_name); + + /** Initializes zookeeper client */ + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); + r->zookeeper_handle = + zookeeper_init(args->uri->authority, zookeeper_global_watcher, + GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); + if (r->zookeeper_handle == NULL) { + gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); + return NULL; + } + + return &r->base; +} + +/* + * FACTORY + */ + +static void zookeeper_factory_ref(grpc_resolver_factory *factory) {} + +static void zookeeper_factory_unref(grpc_resolver_factory *factory) {} + +static char *zookeeper_factory_get_default_hostname( + grpc_resolver_factory *factory, grpc_uri *uri) { + return NULL; +} + +static grpc_resolver *zookeeper_factory_create_resolver( + grpc_resolver_factory *factory, grpc_resolver_args *args) { + return zookeeper_create(args, "pick_first"); +} + +static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { + zookeeper_factory_ref, zookeeper_factory_unref, + zookeeper_factory_create_resolver, zookeeper_factory_get_default_hostname, + "zookeeper"}; + +static grpc_resolver_factory zookeeper_resolver_factory = { + &zookeeper_factory_vtable}; + +static grpc_resolver_factory *zookeeper_resolver_factory_create() { + return &zookeeper_resolver_factory; +} + +static void zookeeper_plugin_init() { + grpc_register_resolver_type(zookeeper_resolver_factory_create()); +} + +void grpc_zookeeper_register() { + GRPC_API_TRACE("grpc_zookeeper_register(void)", 0, ()); + grpc_register_plugin(zookeeper_plugin_init, NULL); +} diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index a5f7c6cfbc..606fff5fb4 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -40,8 +40,8 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> +#include "src/core/ext/census/grpc_filter.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" -#include "src/core/lib/census/grpc_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/client_channel.h" #include "src/core/lib/channel/compress_filter.h" |