aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/README.md76
-rw-r--r--src/core/ext/census/aggregation.h66
-rw-r--r--src/core/ext/census/census_init.c48
-rw-r--r--src/core/ext/census/census_interface.h76
-rw-r--r--src/core/ext/census/census_log.c603
-rw-r--r--src/core/ext/census/census_log.h91
-rw-r--r--src/core/ext/census/census_rpc_stats.c253
-rw-r--r--src/core/ext/census/census_rpc_stats.h101
-rw-r--r--src/core/ext/census/census_tracing.c241
-rw-r--r--src/core/ext/census/census_tracing.h96
-rw-r--r--src/core/ext/census/context.c509
-rw-r--r--src/core/ext/census/grpc_context.c53
-rw-r--r--src/core/ext/census/grpc_filter.c198
-rw-r--r--src/core/ext/census/grpc_filter.h44
-rw-r--r--src/core/ext/census/grpc_plugin.c68
-rw-r--r--src/core/ext/census/hash_table.c303
-rw-r--r--src/core/ext/census/hash_table.h131
-rw-r--r--src/core/ext/census/initialize.c54
-rw-r--r--src/core/ext/census/mlog.c600
-rw-r--r--src/core/ext/census/mlog.h95
-rw-r--r--src/core/ext/census/operation.c63
-rw-r--r--src/core/ext/census/placeholders.c109
-rw-r--r--src/core/ext/census/rpc_metric_id.h51
-rw-r--r--src/core/ext/census/tracing.c45
-rw-r--r--src/core/ext/census/window_stats.c316
-rw-r--r--src/core/ext/census/window_stats.h173
-rw-r--r--src/core/ext/resolver/dns/native/README.md2
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c299
-rw-r--r--src/core/ext/resolver/sockaddr/README.md1
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c363
-rw-r--r--src/core/ext/resolver/zookeeper/README.md1
-rw-r--r--src/core/ext/resolver/zookeeper/zookeeper_resolver.c512
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c2
33 files changed, 5642 insertions, 1 deletions
diff --git a/src/core/ext/census/README.md b/src/core/ext/census/README.md
new file mode 100644
index 0000000000..fb615a2194
--- /dev/null
+++ b/src/core/ext/census/README.md
@@ -0,0 +1,76 @@
+<!---
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+-->
+
+# Census - a resource measurement and tracing system
+
+This directory contains code for Census, which will ultimately provide the
+following features for any gRPC-using system:
+* A [dapper](http://research.google.com/pubs/pub36356.html)-like tracing
+ system, enabling tracing across a distributed infrastructure.
+* RPC statistics and measurements for key metrics, such as latency, bytes
+ transferred, number of errors etc.
+* Resource measurement framework which can be used for measuring custom
+ metrics. Through the use of [tags](#Tags), these can be broken down across
+ the entire distributed stack.
+* Easy integration of the above with
+ [Google Cloud Trace](https://cloud.google.com/tools/cloud-trace) and
+ [Google Cloud Monitoring](https://cloud.google.com/monitoring/).
+
+## Concepts
+
+### Context
+
+### Operations
+
+### Tags
+
+### Metrics
+
+## API
+
+### Internal/RPC API
+
+### External/Client API
+
+### RPC API
+
+## Files in this directory
+
+Note that files and functions in this directory can be split into two
+categories:
+* Files that define core census library functions. Functions etc. in these
+ files are named census\_\*, and constitute the core census library
+ functionality. At some time in the future, these will become a standalone
+ library.
+* Files that define functions etc. that provide a convenient interface between
+ grpc and the core census functionality. These files are all named
+ grpc\_\*.{c,h}, and define function names beginning with grpc\_census\_\*.
+
diff --git a/src/core/ext/census/aggregation.h b/src/core/ext/census/aggregation.h
new file mode 100644
index 0000000000..45f789c772
--- /dev/null
+++ b/src/core/ext/census/aggregation.h
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <stddef.h>
+
+#ifndef GRPC_CORE_EXT_CENSUS_AGGREGATION_H
+#define GRPC_CORE_EXT_CENSUS_AGGREGATION_H
+
+/** Structure used to describe an aggregation type. */
+struct census_aggregation_ops {
+ /* Create a new aggregation. The pointer returned can be used in future calls
+ to clone(), free(), record(), data() and reset(). */
+ void *(*create)(const void *create_arg);
+ /* Make a copy of an aggregation created by create() */
+ void *(*clone)(const void *aggregation);
+ /* Destroy an aggregation created by create() */
+ void (*free)(void *aggregation);
+ /* Record a new value against aggregation. */
+ void (*record)(void *aggregation, double value);
+ /* Return current aggregation data. The caller must cast this object into
+ the correct type for the aggregation result. The object returned can be
+ freed by using free_data(). */
+ void *(*data)(const void *aggregation);
+ /* free data returned by data() */
+ void (*free_data)(void *data);
+ /* Reset an aggregation to default (zero) values. */
+ void (*reset)(void *aggregation);
+ /* Merge 'from' aggregation into 'to'. Both aggregations must be compatible */
+ void (*merge)(void *to, const void *from);
+ /* Fill buffer with printable string version of aggregation contents. For
+ debugging only. Returns the number of bytes added to buffer (a value == n
+ implies the buffer was of insufficient size). */
+ size_t (*print)(const void *aggregation, char *buffer, size_t n);
+};
+
+#endif /* GRPC_CORE_EXT_CENSUS_AGGREGATION_H */
diff --git a/src/core/ext/census/census_init.c b/src/core/ext/census/census_init.c
new file mode 100644
index 0000000000..690b09e789
--- /dev/null
+++ b/src/core/ext/census/census_init.c
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/census_interface.h"
+
+#include <grpc/support/log.h>
+#include "src/core/ext/census/census_rpc_stats.h"
+#include "src/core/ext/census/census_tracing.h"
+
+void census_init(void) {
+ census_tracing_init();
+ census_stats_store_init();
+}
+
+void census_shutdown(void) {
+ census_stats_store_shutdown();
+ census_tracing_shutdown();
+}
diff --git a/src/core/ext/census/census_interface.h b/src/core/ext/census/census_interface.h
new file mode 100644
index 0000000000..57e75f56ee
--- /dev/null
+++ b/src/core/ext/census/census_interface.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_CENSUS_INTERFACE_H
+#define GRPC_CORE_EXT_CENSUS_CENSUS_INTERFACE_H
+
+#include <grpc/support/port_platform.h>
+
+/* Maximum length of an individual census trace annotation. */
+#define CENSUS_MAX_ANNOTATION_LENGTH 200
+
+/* Structure of a census op id. Define as structure because 64bit integer is not
+ available on every platform for C89. */
+typedef struct census_op_id {
+ uint32_t upper;
+ uint32_t lower;
+} census_op_id;
+
+typedef struct census_rpc_stats census_rpc_stats;
+
+/* Initializes Census library. No-op if Census is already initialized. */
+void census_init(void);
+
+/* Shutdown Census Library. */
+void census_shutdown(void);
+
+/* Annotates grpc method name on a census_op_id. The method name has the format
+ of <full quantified rpc service name>/<rpc function name>. Returns 0 iff
+ op_id and method_name are all valid. op_id is valid after its creation and
+ before calling census_tracing_end_op().
+
+ TODO(hongyu): Figure out valid characters set for service name and command
+ name and document requirements here.*/
+int census_add_method_tag(census_op_id op_id, const char *method_name);
+
+/* Annotates tracing information to a specific op_id.
+ Up to CENSUS_MAX_ANNOTATION_LENGTH bytes are recorded. */
+void census_tracing_print(census_op_id op_id, const char *annotation);
+
+/* Starts tracing for an RPC. Returns a locally unique census_op_id */
+census_op_id census_tracing_start_op(void);
+
+/* Ends tracing. Calling this function will invalidate the input op_id. */
+void census_tracing_end_op(census_op_id op_id);
+
+#endif /* GRPC_CORE_EXT_CENSUS_CENSUS_INTERFACE_H */
diff --git a/src/core/ext/census/census_log.c b/src/core/ext/census/census_log.c
new file mode 100644
index 0000000000..9a7331adc2
--- /dev/null
+++ b/src/core/ext/census/census_log.c
@@ -0,0 +1,603 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Available log space is divided up in blocks of
+ CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the
+ following three data structures:
+ - Free blocks (free_block_list)
+ - Blocks with unread data (dirty_block_list)
+ - Blocks currently attached to cores (core_local_blocks[])
+
+ census_log_start_write() moves a block from core_local_blocks[] to the
+ end of dirty_block_list when block:
+ - is out-of-space OR
+ - has an incomplete record (an incomplete record occurs when a thread calls
+ census_log_start_write() and is context-switched before calling
+ census_log_end_write()
+ So, blocks in dirty_block_list are ordered, from oldest to newest, by time
+ when block is detached from the core.
+
+ census_log_read_next() first iterates over dirty_block_list and then
+ core_local_blocks[]. It moves completely read blocks from dirty_block_list
+ to free_block_list. Blocks in core_local_blocks[] are not freed, even when
+ completely read.
+
+ If log is configured to discard old records and free_block_list is empty,
+ census_log_start_write() iterates over dirty_block_list to allocate a
+ new block. It moves the oldest available block (no pending read/write) to
+ core_local_blocks[].
+
+ core_local_block_struct is used to implement a map from core id to the block
+ associated with that core. This mapping is advisory. It is possible that the
+ block returned by this mapping is no longer associated with that core. This
+ mapping is updated, lazily, by census_log_start_write().
+
+ Locking in block struct:
+
+ Exclusive g_log.lock must be held before calling any functions operatong on
+ block structs except census_log_start_write() and
+ census_log_end_write().
+
+ Writes to a block are serialized via writer_lock.
+ census_log_start_write() acquires this lock and
+ census_log_end_write() releases it. On failure to acquire the lock,
+ writer allocates a new block for the current core and updates
+ core_local_block accordingly.
+
+ Simultaneous read and write access is allowed. Reader can safely read up to
+ committed bytes (bytes_committed).
+
+ reader_lock protects the block, currently being read, from getting recycled.
+ start_read() acquires reader_lock and end_read() releases the lock.
+
+ Read/write access to a block is disabled via try_disable_access(). It returns
+ with both writer_lock and reader_lock held. These locks are subsequently
+ released by enable_access() to enable access to the block.
+
+ A note on naming: Most function/struct names are prepended by cl_
+ (shorthand for census_log). Further, functions that manipulate structures
+ include the name of the structure, which will be passed as the first
+ argument. E.g. cl_block_initialize() will initialize a cl_block.
+*/
+#include "src/core/ext/census/census_log.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/cpu.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+#include <string.h>
+
+/* End of platform specific code */
+
+typedef struct census_log_block_list_struct {
+ struct census_log_block_list_struct *next;
+ struct census_log_block_list_struct *prev;
+ struct census_log_block *block;
+} cl_block_list_struct;
+
+typedef struct census_log_block {
+ /* Pointer to underlying buffer */
+ char *buffer;
+ gpr_atm writer_lock;
+ gpr_atm reader_lock;
+ /* Keeps completely written bytes. Declared atomic because accessed
+ simultaneously by reader and writer. */
+ gpr_atm bytes_committed;
+ /* Bytes already read */
+ int32_t bytes_read;
+ /* Links for list */
+ cl_block_list_struct link;
+/* We want this structure to be cacheline aligned. We assume the following
+ sizes for the various parts on 32/64bit systems:
+ type 32b size 64b size
+ char* 4 8
+ 3x gpr_atm 12 24
+ int32_t 4 8 (assumes padding)
+ cl_block_list_struct 12 24
+ TOTAL 32 64
+
+ Depending on the size of our cacheline and the architecture, we
+ selectively add char buffering to this structure. The size is checked
+ via assert in census_log_initialize(). */
+#if defined(GPR_ARCH_64)
+#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64)
+#else
+#if defined(GPR_ARCH_32)
+#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 32)
+#else
+#error "Unknown architecture"
+#endif
+#endif
+#if CL_BLOCK_PAD_SIZE > 0
+ char padding[CL_BLOCK_PAD_SIZE];
+#endif
+} cl_block;
+
+/* A list of cl_blocks, doubly-linked through cl_block::link. */
+typedef struct census_log_block_list {
+ int32_t count; /* Number of items in list. */
+ cl_block_list_struct ht; /* head/tail of linked list. */
+} cl_block_list;
+
+/* Cacheline aligned block pointers to avoid false sharing. Block pointer must
+ be initialized via set_block(), before calling other functions */
+typedef struct census_log_core_local_block {
+ gpr_atm block;
+/* Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8 */
+#if defined(GPR_ARCH_64)
+#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8)
+#else
+#if defined(GPR_ARCH_32)
+#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 4)
+#else
+#error "Unknown architecture"
+#endif
+#endif
+#if CL_CORE_LOCAL_BLOCK_PAD_SIZE > 0
+ char padding[CL_CORE_LOCAL_BLOCK_PAD_SIZE];
+#endif
+} cl_core_local_block;
+
+struct census_log {
+ int discard_old_records;
+ /* Number of cores (aka hardware-contexts) */
+ unsigned num_cores;
+ /* number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log */
+ int32_t num_blocks;
+ cl_block *blocks; /* Block metadata. */
+ cl_core_local_block *core_local_blocks; /* Keeps core to block mappings. */
+ gpr_mu lock;
+ int initialized; /* has log been initialized? */
+ /* Keeps the state of the reader iterator. A value of 0 indicates that
+ iterator has reached the end. census_log_init_reader() resets the
+ value to num_core to restart iteration. */
+ uint32_t read_iterator_state;
+ /* Points to the block being read. If non-NULL, the block is locked for
+ reading (block_being_read_->reader_lock is held). */
+ cl_block *block_being_read;
+ /* A non-zero value indicates that log is full. */
+ gpr_atm is_full;
+ char *buffer;
+ cl_block_list free_block_list;
+ cl_block_list dirty_block_list;
+ gpr_atm out_of_space_count;
+};
+
+/* Single internal log */
+static struct census_log g_log;
+
+/* Functions that operate on an atomic memory location used as a lock */
+
+/* Returns non-zero if lock is acquired */
+static int cl_try_lock(gpr_atm *lock) { return gpr_atm_acq_cas(lock, 0, 1); }
+
+static void cl_unlock(gpr_atm *lock) { gpr_atm_rel_store(lock, 0); }
+
+/* Functions that operate on cl_core_local_block's */
+
+static void cl_core_local_block_set_block(cl_core_local_block *clb,
+ cl_block *block) {
+ gpr_atm_rel_store(&clb->block, (gpr_atm)block);
+}
+
+static cl_block *cl_core_local_block_get_block(cl_core_local_block *clb) {
+ return (cl_block *)gpr_atm_acq_load(&clb->block);
+}
+
+/* Functions that operate on cl_block_list_struct's */
+
+static void cl_block_list_struct_initialize(cl_block_list_struct *bls,
+ cl_block *block) {
+ bls->next = bls->prev = bls;
+ bls->block = block;
+}
+
+/* Functions that operate on cl_block_list's */
+
+static void cl_block_list_initialize(cl_block_list *list) {
+ list->count = 0;
+ cl_block_list_struct_initialize(&list->ht, NULL);
+}
+
+/* Returns head of *this, or NULL if empty. */
+static cl_block *cl_block_list_head(cl_block_list *list) {
+ return list->ht.next->block;
+}
+
+/* Insert element *e after *pos. */
+static void cl_block_list_insert(cl_block_list *list, cl_block_list_struct *pos,
+ cl_block_list_struct *e) {
+ list->count++;
+ e->next = pos->next;
+ e->prev = pos;
+ e->next->prev = e;
+ e->prev->next = e;
+}
+
+/* Insert block at the head of the list */
+static void cl_block_list_insert_at_head(cl_block_list *list, cl_block *block) {
+ cl_block_list_insert(list, &list->ht, &block->link);
+}
+
+/* Insert block at the tail of the list */
+static void cl_block_list_insert_at_tail(cl_block_list *list, cl_block *block) {
+ cl_block_list_insert(list, list->ht.prev, &block->link);
+}
+
+/* Removes block *b. Requires *b be in the list. */
+static void cl_block_list_remove(cl_block_list *list, cl_block *b) {
+ list->count--;
+ b->link.next->prev = b->link.prev;
+ b->link.prev->next = b->link.next;
+}
+
+/* Functions that operate on cl_block's */
+
+static void cl_block_initialize(cl_block *block, char *buffer) {
+ block->buffer = buffer;
+ gpr_atm_rel_store(&block->writer_lock, 0);
+ gpr_atm_rel_store(&block->reader_lock, 0);
+ gpr_atm_rel_store(&block->bytes_committed, 0);
+ block->bytes_read = 0;
+ cl_block_list_struct_initialize(&block->link, block);
+}
+
+/* Guards against exposing partially written buffer to the reader. */
+static void cl_block_set_bytes_committed(cl_block *block,
+ int32_t bytes_committed) {
+ gpr_atm_rel_store(&block->bytes_committed, bytes_committed);
+}
+
+static int32_t cl_block_get_bytes_committed(cl_block *block) {
+ return gpr_atm_acq_load(&block->bytes_committed);
+}
+
+/* Tries to disable future read/write access to this block. Succeeds if:
+ - no in-progress write AND
+ - no in-progress read AND
+ - 'discard_data' set to true OR no unread data
+ On success, clears the block state and returns with writer_lock_ and
+ reader_lock_ held. These locks are released by a subsequent
+ cl_block_access_enable() call. */
+static int cl_block_try_disable_access(cl_block *block, int discard_data) {
+ if (!cl_try_lock(&block->writer_lock)) {
+ return 0;
+ }
+ if (!cl_try_lock(&block->reader_lock)) {
+ cl_unlock(&block->writer_lock);
+ return 0;
+ }
+ if (!discard_data &&
+ (block->bytes_read != cl_block_get_bytes_committed(block))) {
+ cl_unlock(&block->reader_lock);
+ cl_unlock(&block->writer_lock);
+ return 0;
+ }
+ cl_block_set_bytes_committed(block, 0);
+ block->bytes_read = 0;
+ return 1;
+}
+
+static void cl_block_enable_access(cl_block *block) {
+ cl_unlock(&block->reader_lock);
+ cl_unlock(&block->writer_lock);
+}
+
+/* Returns with writer_lock held. */
+static void *cl_block_start_write(cl_block *block, size_t size) {
+ int32_t bytes_committed;
+ if (!cl_try_lock(&block->writer_lock)) {
+ return NULL;
+ }
+ bytes_committed = cl_block_get_bytes_committed(block);
+ if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) {
+ cl_unlock(&block->writer_lock);
+ return NULL;
+ }
+ return block->buffer + bytes_committed;
+}
+
+/* Releases writer_lock and increments committed bytes by 'bytes_written'.
+ 'bytes_written' must be <= 'size' specified in the corresponding
+ StartWrite() call. This function is thread-safe. */
+static void cl_block_end_write(cl_block *block, size_t bytes_written) {
+ cl_block_set_bytes_committed(
+ block, cl_block_get_bytes_committed(block) + bytes_written);
+ cl_unlock(&block->writer_lock);
+}
+
+/* Returns a pointer to the first unread byte in buffer. The number of bytes
+ available are returned in 'bytes_available'. Acquires reader lock that is
+ released by a subsequent cl_block_end_read() call. Returns NULL if:
+ - read in progress
+ - no data available */
+static void *cl_block_start_read(cl_block *block, size_t *bytes_available) {
+ void *record;
+ if (!cl_try_lock(&block->reader_lock)) {
+ return NULL;
+ }
+ /* bytes_committed may change from under us. Use bytes_available to update
+ bytes_read below. */
+ *bytes_available = cl_block_get_bytes_committed(block) - block->bytes_read;
+ if (*bytes_available == 0) {
+ cl_unlock(&block->reader_lock);
+ return NULL;
+ }
+ record = block->buffer + block->bytes_read;
+ block->bytes_read += *bytes_available;
+ return record;
+}
+
+static void cl_block_end_read(cl_block *block) {
+ cl_unlock(&block->reader_lock);
+}
+
+/* Internal functions operating on g_log */
+
+/* Allocates a new free block (or recycles an available dirty block if log is
+ configured to discard old records). Returns NULL if out-of-space. */
+static cl_block *cl_allocate_block(void) {
+ cl_block *block = cl_block_list_head(&g_log.free_block_list);
+ if (block != NULL) {
+ cl_block_list_remove(&g_log.free_block_list, block);
+ return block;
+ }
+ if (!g_log.discard_old_records) {
+ /* No free block and log is configured to keep old records. */
+ return NULL;
+ }
+ /* Recycle dirty block. Start from the oldest. */
+ for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL;
+ block = block->link.next->block) {
+ if (cl_block_try_disable_access(block, 1 /* discard data */)) {
+ cl_block_list_remove(&g_log.dirty_block_list, block);
+ return block;
+ }
+ }
+ return NULL;
+}
+
+/* Allocates a new block and updates core id => block mapping. 'old_block'
+ points to the block that the caller thinks is attached to
+ 'core_id'. 'old_block' may be NULL. Returns non-zero if:
+ - allocated a new block OR
+ - 'core_id' => 'old_block' mapping changed (another thread allocated a
+ block before lock was acquired). */
+static int cl_allocate_core_local_block(int32_t core_id, cl_block *old_block) {
+ /* Now that we have the lock, check if core-local mapping has changed. */
+ cl_core_local_block *core_local_block = &g_log.core_local_blocks[core_id];
+ cl_block *block = cl_core_local_block_get_block(core_local_block);
+ if ((block != NULL) && (block != old_block)) {
+ return 1;
+ }
+ if (block != NULL) {
+ cl_core_local_block_set_block(core_local_block, NULL);
+ cl_block_list_insert_at_tail(&g_log.dirty_block_list, block);
+ }
+ block = cl_allocate_block();
+ if (block == NULL) {
+ gpr_atm_rel_store(&g_log.is_full, 1);
+ return 0;
+ }
+ cl_core_local_block_set_block(core_local_block, block);
+ cl_block_enable_access(block);
+ return 1;
+}
+
+static cl_block *cl_get_block(void *record) {
+ uintptr_t p = (uintptr_t)((char *)record - g_log.buffer);
+ uintptr_t index = p >> CENSUS_LOG_2_MAX_RECORD_SIZE;
+ return &g_log.blocks[index];
+}
+
+/* Gets the next block to read and tries to free 'prev' block (if not NULL).
+ Returns NULL if reached the end. */
+static cl_block *cl_next_block_to_read(cl_block *prev) {
+ cl_block *block = NULL;
+ if (g_log.read_iterator_state == g_log.num_cores) {
+ /* We are traversing dirty list; find the next dirty block. */
+ if (prev != NULL) {
+ /* Try to free the previous block if there is no unread data. This block
+ may have unread data if previously incomplete record completed between
+ read_next() calls. */
+ block = prev->link.next->block;
+ if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) {
+ cl_block_list_remove(&g_log.dirty_block_list, prev);
+ cl_block_list_insert_at_head(&g_log.free_block_list, prev);
+ gpr_atm_rel_store(&g_log.is_full, 0);
+ }
+ } else {
+ block = cl_block_list_head(&g_log.dirty_block_list);
+ }
+ if (block != NULL) {
+ return block;
+ }
+ /* We are done with the dirty list; moving on to core-local blocks. */
+ }
+ while (g_log.read_iterator_state > 0) {
+ g_log.read_iterator_state--;
+ block = cl_core_local_block_get_block(
+ &g_log.core_local_blocks[g_log.read_iterator_state]);
+ if (block != NULL) {
+ return block;
+ }
+ }
+ return NULL;
+}
+
+/* External functions: primary stats_log interface */
+void census_log_initialize(size_t size_in_mb, int discard_old_records) {
+ int32_t ix;
+ /* Check cacheline alignment. */
+ GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0);
+ GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0);
+ GPR_ASSERT(!g_log.initialized);
+ g_log.discard_old_records = discard_old_records;
+ g_log.num_cores = gpr_cpu_num_cores();
+ /* Ensure at least as many blocks as there are cores. */
+ g_log.num_blocks = GPR_MAX(
+ g_log.num_cores, (size_in_mb << 20) >> CENSUS_LOG_2_MAX_RECORD_SIZE);
+ gpr_mu_init(&g_log.lock);
+ g_log.read_iterator_state = 0;
+ g_log.block_being_read = NULL;
+ gpr_atm_rel_store(&g_log.is_full, 0);
+ g_log.core_local_blocks = (cl_core_local_block *)gpr_malloc_aligned(
+ g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE_LOG);
+ memset(g_log.core_local_blocks, 0,
+ g_log.num_cores * sizeof(cl_core_local_block));
+ g_log.blocks = (cl_block *)gpr_malloc_aligned(
+ g_log.num_blocks * sizeof(cl_block), GPR_CACHELINE_SIZE_LOG);
+ memset(g_log.blocks, 0, g_log.num_blocks * sizeof(cl_block));
+ g_log.buffer = gpr_malloc(g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE);
+ memset(g_log.buffer, 0, g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE);
+ cl_block_list_initialize(&g_log.free_block_list);
+ cl_block_list_initialize(&g_log.dirty_block_list);
+ for (ix = 0; ix < g_log.num_blocks; ++ix) {
+ cl_block *block = g_log.blocks + ix;
+ cl_block_initialize(block,
+ g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * ix));
+ cl_block_try_disable_access(block, 1 /* discard data */);
+ cl_block_list_insert_at_tail(&g_log.free_block_list, block);
+ }
+ gpr_atm_rel_store(&g_log.out_of_space_count, 0);
+ g_log.initialized = 1;
+}
+
+void census_log_shutdown(void) {
+ GPR_ASSERT(g_log.initialized);
+ gpr_mu_destroy(&g_log.lock);
+ gpr_free_aligned(g_log.core_local_blocks);
+ g_log.core_local_blocks = NULL;
+ gpr_free_aligned(g_log.blocks);
+ g_log.blocks = NULL;
+ gpr_free(g_log.buffer);
+ g_log.buffer = NULL;
+ g_log.initialized = 0;
+}
+
+void *census_log_start_write(size_t size) {
+ /* Used to bound number of times block allocation is attempted. */
+ int32_t attempts_remaining = g_log.num_blocks;
+ /* TODO(aveitch): move this inside the do loop when current_cpu is fixed */
+ int32_t core_id = gpr_cpu_current_cpu();
+ GPR_ASSERT(g_log.initialized);
+ if (size > CENSUS_LOG_MAX_RECORD_SIZE) {
+ return NULL;
+ }
+ do {
+ int allocated;
+ void *record = NULL;
+ cl_block *block =
+ cl_core_local_block_get_block(&g_log.core_local_blocks[core_id]);
+ if (block && (record = cl_block_start_write(block, size))) {
+ return record;
+ }
+ /* Need to allocate a new block. We are here if:
+ - No block associated with the core OR
+ - Write in-progress on the block OR
+ - block is out of space */
+ if (gpr_atm_acq_load(&g_log.is_full)) {
+ gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
+ return NULL;
+ }
+ gpr_mu_lock(&g_log.lock);
+ allocated = cl_allocate_core_local_block(core_id, block);
+ gpr_mu_unlock(&g_log.lock);
+ if (!allocated) {
+ gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
+ return NULL;
+ }
+ } while (attempts_remaining--);
+ /* Give up. */
+ gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
+ return NULL;
+}
+
+void census_log_end_write(void *record, size_t bytes_written) {
+ GPR_ASSERT(g_log.initialized);
+ cl_block_end_write(cl_get_block(record), bytes_written);
+}
+
+void census_log_init_reader(void) {
+ GPR_ASSERT(g_log.initialized);
+ gpr_mu_lock(&g_log.lock);
+ /* If a block is locked for reading unlock it. */
+ if (g_log.block_being_read != NULL) {
+ cl_block_end_read(g_log.block_being_read);
+ g_log.block_being_read = NULL;
+ }
+ g_log.read_iterator_state = g_log.num_cores;
+ gpr_mu_unlock(&g_log.lock);
+}
+
+const void *census_log_read_next(size_t *bytes_available) {
+ GPR_ASSERT(g_log.initialized);
+ gpr_mu_lock(&g_log.lock);
+ if (g_log.block_being_read != NULL) {
+ cl_block_end_read(g_log.block_being_read);
+ }
+ do {
+ g_log.block_being_read = cl_next_block_to_read(g_log.block_being_read);
+ if (g_log.block_being_read != NULL) {
+ void *record =
+ cl_block_start_read(g_log.block_being_read, bytes_available);
+ if (record != NULL) {
+ gpr_mu_unlock(&g_log.lock);
+ return record;
+ }
+ }
+ } while (g_log.block_being_read != NULL);
+ gpr_mu_unlock(&g_log.lock);
+ return NULL;
+}
+
+size_t census_log_remaining_space(void) {
+ size_t space;
+ GPR_ASSERT(g_log.initialized);
+ gpr_mu_lock(&g_log.lock);
+ if (g_log.discard_old_records) {
+ /* Remaining space is not meaningful; just return the entire log space. */
+ space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE;
+ } else {
+ space = g_log.free_block_list.count * CENSUS_LOG_MAX_RECORD_SIZE;
+ }
+ gpr_mu_unlock(&g_log.lock);
+ return space;
+}
+
+int census_log_out_of_space_count(void) {
+ GPR_ASSERT(g_log.initialized);
+ return gpr_atm_acq_load(&g_log.out_of_space_count);
+}
diff --git a/src/core/ext/census/census_log.h b/src/core/ext/census/census_log.h
new file mode 100644
index 0000000000..534ecc5705
--- /dev/null
+++ b/src/core/ext/census/census_log.h
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_CENSUS_LOG_H
+#define GRPC_CORE_EXT_CENSUS_CENSUS_LOG_H
+
+#include <stddef.h>
+
+/* Maximum record size, in bytes. */
+#define CENSUS_LOG_2_MAX_RECORD_SIZE 14 /* 2^14 = 16KB */
+#define CENSUS_LOG_MAX_RECORD_SIZE (1 << CENSUS_LOG_2_MAX_RECORD_SIZE)
+
+/* Initialize the statistics logging subsystem with the given log size. A log
+ size of 0 will result in the smallest possible log for the platform
+ (approximately CENSUS_LOG_MAX_RECORD_SIZE * gpr_cpu_num_cores()). If
+ discard_old_records is non-zero, then new records will displace older ones
+ when the log is full. This function must be called before any other
+ census_log functions.
+*/
+void census_log_initialize(size_t size_in_mb, int discard_old_records);
+
+/* Shutdown the logging subsystem. Caller must ensure that:
+ - no in progress or future call to any census_log functions
+ - no incomplete records
+*/
+void census_log_shutdown(void);
+
+/* Allocates and returns a 'size' bytes record and marks it in use. A
+ subsequent census_log_end_write() marks the record complete. The
+ 'bytes_written' census_log_end_write() argument must be <=
+ 'size'. Returns NULL if out-of-space AND:
+ - log is configured to keep old records OR
+ - all blocks are pinned by incomplete records.
+*/
+void *census_log_start_write(size_t size);
+
+void census_log_end_write(void *record, size_t bytes_written);
+
+/* census_log_read_next() iterates over blocks with data and for each block
+ returns a pointer to the first unread byte. The number of bytes that can be
+ read are returned in 'bytes_available'. Reader is expected to read all
+ available data. Reading the data consumes it i.e. it cannot be read again.
+ census_log_read_next() returns NULL if the end is reached i.e last block
+ is read. census_log_init_reader() starts the iteration or aborts the
+ current iteration.
+*/
+void census_log_init_reader(void);
+const void *census_log_read_next(size_t *bytes_available);
+
+/* Returns estimated remaining space across all blocks, in bytes. If log is
+ configured to discard old records, returns total log space. Otherwise,
+ returns space available in empty blocks (partially filled blocks are
+ treated as full).
+*/
+size_t census_log_remaining_space(void);
+
+/* Returns the number of times gprc_stats_log_start_write() failed due to
+ out-of-space. */
+int census_log_out_of_space_count(void);
+
+#endif /* GRPC_CORE_EXT_CENSUS_CENSUS_LOG_H */
diff --git a/src/core/ext/census/census_rpc_stats.c b/src/core/ext/census/census_rpc_stats.c
new file mode 100644
index 0000000000..09ee12d54b
--- /dev/null
+++ b/src/core/ext/census/census_rpc_stats.c
@@ -0,0 +1,253 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include "src/core/ext/census/census_interface.h"
+#include "src/core/ext/census/census_rpc_stats.h"
+#include "src/core/ext/census/census_tracing.h"
+#include "src/core/ext/census/hash_table.h"
+#include "src/core/ext/census/window_stats.h"
+#include "src/core/lib/support/murmur_hash.h"
+#include "src/core/lib/support/string.h"
+
+#define NUM_INTERVALS 3
+#define MINUTE_INTERVAL 0
+#define HOUR_INTERVAL 1
+#define TOTAL_INTERVAL 2
+
+/* for easier typing */
+typedef census_per_method_rpc_stats per_method_stats;
+
+/* Ensure mu is only initialized once. */
+static gpr_once g_stats_store_mu_init = GPR_ONCE_INIT;
+/* Guards two stats stores. */
+static gpr_mu g_mu;
+static census_ht *g_client_stats_store = NULL;
+static census_ht *g_server_stats_store = NULL;
+
+static void init_mutex(void) { gpr_mu_init(&g_mu); }
+
+static void init_mutex_once(void) {
+ gpr_once_init(&g_stats_store_mu_init, init_mutex);
+}
+
+static int cmp_str_keys(const void *k1, const void *k2) {
+ return strcmp((const char *)k1, (const char *)k2);
+}
+
+/* TODO(hongyu): replace it with cityhash64 */
+static uint64_t simple_hash(const void *k) {
+ size_t len = strlen(k);
+ uint64_t higher = gpr_murmur_hash3((const char *)k, len / 2, 0);
+ return higher << 32 |
+ gpr_murmur_hash3((const char *)k + len / 2, len - len / 2, 0);
+}
+
+static void delete_stats(void *stats) {
+ census_window_stats_destroy((struct census_window_stats *)stats);
+}
+
+static void delete_key(void *key) { gpr_free(key); }
+
+static const census_ht_option ht_opt = {
+ CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */,
+ simple_hash /* hash function */, cmp_str_keys /* key comparator */,
+ delete_stats /* data deleter */, delete_key /* key deleter */
+};
+
+static void init_rpc_stats(void *stats) {
+ memset(stats, 0, sizeof(census_rpc_stats));
+}
+
+static void stat_add_proportion(double p, void *base, const void *addme) {
+ census_rpc_stats *b = (census_rpc_stats *)base;
+ census_rpc_stats *a = (census_rpc_stats *)addme;
+ b->cnt += p * a->cnt;
+ b->rpc_error_cnt += p * a->rpc_error_cnt;
+ b->app_error_cnt += p * a->app_error_cnt;
+ b->elapsed_time_ms += p * a->elapsed_time_ms;
+ b->api_request_bytes += p * a->api_request_bytes;
+ b->wire_request_bytes += p * a->wire_request_bytes;
+ b->api_response_bytes += p * a->api_response_bytes;
+ b->wire_response_bytes += p * a->wire_response_bytes;
+}
+
+static void stat_add(void *base, const void *addme) {
+ stat_add_proportion(1.0, base, addme);
+}
+
+static gpr_timespec min_hour_total_intervals[3] = {
+ {60, 0}, {3600, 0}, {36000000, 0}};
+
+static const census_window_stats_stat_info window_stats_settings = {
+ sizeof(census_rpc_stats), init_rpc_stats, stat_add, stat_add_proportion};
+
+census_rpc_stats *census_rpc_stats_create_empty(void) {
+ census_rpc_stats *ret =
+ (census_rpc_stats *)gpr_malloc(sizeof(census_rpc_stats));
+ memset(ret, 0, sizeof(census_rpc_stats));
+ return ret;
+}
+
+void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats *data) {
+ int i = 0;
+ for (i = 0; i < data->num_entries; i++) {
+ if (data->stats[i].method != NULL) {
+ gpr_free((void *)data->stats[i].method);
+ }
+ }
+ if (data->stats != NULL) {
+ gpr_free(data->stats);
+ }
+ data->num_entries = 0;
+ data->stats = NULL;
+}
+
+static void record_stats(census_ht *store, census_op_id op_id,
+ const census_rpc_stats *stats) {
+ gpr_mu_lock(&g_mu);
+ if (store != NULL) {
+ census_trace_obj *trace = NULL;
+ census_internal_lock_trace_store();
+ trace = census_get_trace_obj_locked(op_id);
+ if (trace != NULL) {
+ const char *method_name = census_get_trace_method_name(trace);
+ struct census_window_stats *window_stats = NULL;
+ census_ht_key key;
+ key.ptr = (void *)method_name;
+ window_stats = census_ht_find(store, key);
+ census_internal_unlock_trace_store();
+ if (window_stats == NULL) {
+ window_stats = census_window_stats_create(3, min_hour_total_intervals,
+ 30, &window_stats_settings);
+ key.ptr = gpr_strdup(key.ptr);
+ census_ht_insert(store, key, (void *)window_stats);
+ }
+ census_window_stats_add(window_stats, gpr_now(GPR_CLOCK_REALTIME), stats);
+ } else {
+ census_internal_unlock_trace_store();
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void census_record_rpc_client_stats(census_op_id op_id,
+ const census_rpc_stats *stats) {
+ record_stats(g_client_stats_store, op_id, stats);
+}
+
+void census_record_rpc_server_stats(census_op_id op_id,
+ const census_rpc_stats *stats) {
+ record_stats(g_server_stats_store, op_id, stats);
+}
+
+/* Get stats from input stats store */
+static void get_stats(census_ht *store, census_aggregated_rpc_stats *data) {
+ GPR_ASSERT(data != NULL);
+ if (data->num_entries != 0) {
+ census_aggregated_rpc_stats_set_empty(data);
+ }
+ gpr_mu_lock(&g_mu);
+ if (store != NULL) {
+ size_t n;
+ unsigned i, j;
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
+ census_ht_kv *kv = census_ht_get_all_elements(store, &n);
+ if (kv != NULL) {
+ data->num_entries = n;
+ data->stats =
+ (per_method_stats *)gpr_malloc(sizeof(per_method_stats) * n);
+ for (i = 0; i < n; i++) {
+ census_window_stats_sums sums[NUM_INTERVALS];
+ for (j = 0; j < NUM_INTERVALS; j++) {
+ sums[j].statistic = (void *)census_rpc_stats_create_empty();
+ }
+ data->stats[i].method = gpr_strdup(kv[i].k.ptr);
+ census_window_stats_get_sums(kv[i].v, now, sums);
+ data->stats[i].minute_stats =
+ *(census_rpc_stats *)sums[MINUTE_INTERVAL].statistic;
+ data->stats[i].hour_stats =
+ *(census_rpc_stats *)sums[HOUR_INTERVAL].statistic;
+ data->stats[i].total_stats =
+ *(census_rpc_stats *)sums[TOTAL_INTERVAL].statistic;
+ for (j = 0; j < NUM_INTERVALS; j++) {
+ gpr_free(sums[j].statistic);
+ }
+ }
+ gpr_free(kv);
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void census_get_client_stats(census_aggregated_rpc_stats *data) {
+ get_stats(g_client_stats_store, data);
+}
+
+void census_get_server_stats(census_aggregated_rpc_stats *data) {
+ get_stats(g_server_stats_store, data);
+}
+
+void census_stats_store_init(void) {
+ init_mutex_once();
+ gpr_mu_lock(&g_mu);
+ if (g_client_stats_store == NULL && g_server_stats_store == NULL) {
+ g_client_stats_store = census_ht_create(&ht_opt);
+ g_server_stats_store = census_ht_create(&ht_opt);
+ } else {
+ gpr_log(GPR_ERROR, "Census stats store already initialized.");
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void census_stats_store_shutdown(void) {
+ init_mutex_once();
+ gpr_mu_lock(&g_mu);
+ if (g_client_stats_store != NULL) {
+ census_ht_destroy(g_client_stats_store);
+ g_client_stats_store = NULL;
+ } else {
+ gpr_log(GPR_ERROR, "Census server stats store not initialized.");
+ }
+ if (g_server_stats_store != NULL) {
+ census_ht_destroy(g_server_stats_store);
+ g_server_stats_store = NULL;
+ } else {
+ gpr_log(GPR_ERROR, "Census client stats store not initialized.");
+ }
+ gpr_mu_unlock(&g_mu);
+}
diff --git a/src/core/ext/census/census_rpc_stats.h b/src/core/ext/census/census_rpc_stats.h
new file mode 100644
index 0000000000..7e4d8d1640
--- /dev/null
+++ b/src/core/ext/census/census_rpc_stats.h
@@ -0,0 +1,101 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_CENSUS_RPC_STATS_H
+#define GRPC_CORE_EXT_CENSUS_CENSUS_RPC_STATS_H
+
+#include <grpc/support/port_platform.h>
+#include "src/core/ext/census/census_interface.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct census_rpc_stats {
+ uint64_t cnt;
+ uint64_t rpc_error_cnt;
+ uint64_t app_error_cnt;
+ double elapsed_time_ms;
+ double api_request_bytes;
+ double wire_request_bytes;
+ double api_response_bytes;
+ double wire_response_bytes;
+};
+
+/* Creates an empty rpc stats object on heap. */
+census_rpc_stats *census_rpc_stats_create_empty(void);
+
+typedef struct census_per_method_rpc_stats {
+ const char *method;
+ census_rpc_stats minute_stats; /* cumulative stats in the past minute */
+ census_rpc_stats hour_stats; /* cumulative stats in the past hour */
+ census_rpc_stats total_stats; /* cumulative stats from last gc */
+} census_per_method_rpc_stats;
+
+typedef struct census_aggregated_rpc_stats {
+ int num_entries;
+ census_per_method_rpc_stats *stats;
+} census_aggregated_rpc_stats;
+
+/* Initializes an aggregated rpc stats object to an empty state. */
+void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats *data);
+
+/* Records client side stats of a rpc. */
+void census_record_rpc_client_stats(census_op_id op_id,
+ const census_rpc_stats *stats);
+
+/* Records server side stats of a rpc. */
+void census_record_rpc_server_stats(census_op_id op_id,
+ const census_rpc_stats *stats);
+
+/* The following two functions are intended for inprocess query of
+ per-service per-method stats from grpc implementations. */
+
+/* Populates *data_map with server side aggregated per-service per-method
+ stats.
+ DO NOT CALL from outside of grpc code. */
+void census_get_server_stats(census_aggregated_rpc_stats *data_map);
+
+/* Populates *data_map with client side aggregated per-service per-method
+ stats.
+ DO NOT CALL from outside of grpc code. */
+void census_get_client_stats(census_aggregated_rpc_stats *data_map);
+
+void census_stats_store_init(void);
+void census_stats_store_shutdown(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* GRPC_CORE_EXT_CENSUS_CENSUS_RPC_STATS_H */
diff --git a/src/core/ext/census/census_tracing.c b/src/core/ext/census/census_tracing.c
new file mode 100644
index 0000000000..f893dc9864
--- /dev/null
+++ b/src/core/ext/census/census_tracing.c
@@ -0,0 +1,241 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/census_tracing.h"
+#include "src/core/ext/census/census_interface.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/sync.h>
+#include "src/core/ext/census/hash_table.h"
+#include "src/core/lib/support/string.h"
+
+void census_trace_obj_destroy(census_trace_obj *obj) {
+ census_trace_annotation *p = obj->annotations;
+ while (p != NULL) {
+ census_trace_annotation *next = p->next;
+ gpr_free(p);
+ p = next;
+ }
+ gpr_free(obj->method);
+ gpr_free(obj);
+}
+
+static void delete_trace_obj(void *obj) {
+ census_trace_obj_destroy((census_trace_obj *)obj);
+}
+
+static const census_ht_option ht_opt = {
+ CENSUS_HT_UINT64 /* key type */,
+ 571 /* n_of_buckets */,
+ NULL /* hash */,
+ NULL /* compare_keys */,
+ delete_trace_obj /* delete data */,
+ NULL /* delete key */
+};
+
+static gpr_once g_init_mutex_once = GPR_ONCE_INIT;
+static gpr_mu g_mu; /* Guards following two static variables. */
+static census_ht *g_trace_store = NULL;
+static uint64_t g_id = 0;
+
+static census_ht_key op_id_as_key(census_op_id *id) {
+ return *(census_ht_key *)id;
+}
+
+static uint64_t op_id_2_uint64(census_op_id *id) {
+ uint64_t ret;
+ memcpy(&ret, id, sizeof(census_op_id));
+ return ret;
+}
+
+static void init_mutex(void) { gpr_mu_init(&g_mu); }
+
+static void init_mutex_once(void) {
+ gpr_once_init(&g_init_mutex_once, init_mutex);
+}
+
+census_op_id census_tracing_start_op(void) {
+ gpr_mu_lock(&g_mu);
+ {
+ census_trace_obj *ret = gpr_malloc(sizeof(census_trace_obj));
+ memset(ret, 0, sizeof(census_trace_obj));
+ g_id++;
+ memcpy(&ret->id, &g_id, sizeof(census_op_id));
+ ret->rpc_stats.cnt = 1;
+ ret->ts = gpr_now(GPR_CLOCK_REALTIME);
+ census_ht_insert(g_trace_store, op_id_as_key(&ret->id), (void *)ret);
+ gpr_log(GPR_DEBUG, "Start tracing for id %lu", g_id);
+ gpr_mu_unlock(&g_mu);
+ return ret->id;
+ }
+}
+
+int census_add_method_tag(census_op_id op_id, const char *method) {
+ int ret = 0;
+ census_trace_obj *trace = NULL;
+ gpr_mu_lock(&g_mu);
+ trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
+ if (trace == NULL) {
+ ret = 1;
+ } else {
+ trace->method = gpr_strdup(method);
+ }
+ gpr_mu_unlock(&g_mu);
+ return ret;
+}
+
+void census_tracing_print(census_op_id op_id, const char *anno_txt) {
+ census_trace_obj *trace = NULL;
+ gpr_mu_lock(&g_mu);
+ trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
+ if (trace != NULL) {
+ census_trace_annotation *anno = gpr_malloc(sizeof(census_trace_annotation));
+ anno->ts = gpr_now(GPR_CLOCK_REALTIME);
+ {
+ char *d = anno->txt;
+ const char *s = anno_txt;
+ int n = 0;
+ for (; n < CENSUS_MAX_ANNOTATION_LENGTH && *s != '\0'; ++n) {
+ *d++ = *s++;
+ }
+ *d = '\0';
+ }
+ anno->next = trace->annotations;
+ trace->annotations = anno;
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void census_tracing_end_op(census_op_id op_id) {
+ census_trace_obj *trace = NULL;
+ gpr_mu_lock(&g_mu);
+ trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
+ if (trace != NULL) {
+ trace->rpc_stats.elapsed_time_ms = gpr_timespec_to_micros(
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), trace->ts));
+ gpr_log(GPR_DEBUG, "End tracing for id %lu, method %s, latency %f us",
+ op_id_2_uint64(&op_id), trace->method,
+ trace->rpc_stats.elapsed_time_ms);
+ census_ht_erase(g_trace_store, op_id_as_key(&op_id));
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void census_tracing_init(void) {
+ init_mutex_once();
+ gpr_mu_lock(&g_mu);
+ if (g_trace_store == NULL) {
+ g_id = 1;
+ g_trace_store = census_ht_create(&ht_opt);
+ } else {
+ gpr_log(GPR_ERROR, "Census trace store already initialized.");
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void census_tracing_shutdown(void) {
+ gpr_mu_lock(&g_mu);
+ if (g_trace_store != NULL) {
+ census_ht_destroy(g_trace_store);
+ g_trace_store = NULL;
+ } else {
+ gpr_log(GPR_ERROR, "Census trace store is not initialized.");
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void census_internal_lock_trace_store(void) { gpr_mu_lock(&g_mu); }
+
+void census_internal_unlock_trace_store(void) { gpr_mu_unlock(&g_mu); }
+
+census_trace_obj *census_get_trace_obj_locked(census_op_id op_id) {
+ if (g_trace_store == NULL) {
+ gpr_log(GPR_ERROR, "Census trace store is not initialized.");
+ return NULL;
+ }
+ return (census_trace_obj *)census_ht_find(g_trace_store,
+ op_id_as_key(&op_id));
+}
+
+const char *census_get_trace_method_name(const census_trace_obj *trace) {
+ return trace->method;
+}
+
+static census_trace_annotation *dup_annotation_chain(
+ census_trace_annotation *from) {
+ census_trace_annotation *ret = NULL;
+ census_trace_annotation **to = &ret;
+ for (; from != NULL; from = from->next) {
+ *to = gpr_malloc(sizeof(census_trace_annotation));
+ memcpy(*to, from, sizeof(census_trace_annotation));
+ to = &(*to)->next;
+ }
+ return ret;
+}
+
+static census_trace_obj *trace_obj_dup(census_trace_obj *from) {
+ census_trace_obj *to = NULL;
+ GPR_ASSERT(from != NULL);
+ to = gpr_malloc(sizeof(census_trace_obj));
+ to->id = from->id;
+ to->ts = from->ts;
+ to->rpc_stats = from->rpc_stats;
+ to->method = gpr_strdup(from->method);
+ to->annotations = dup_annotation_chain(from->annotations);
+ return to;
+}
+
+census_trace_obj **census_get_active_ops(int *num_active_ops) {
+ census_trace_obj **ret = NULL;
+ gpr_mu_lock(&g_mu);
+ if (g_trace_store != NULL) {
+ size_t n = 0;
+ census_ht_kv *all_kvs = census_ht_get_all_elements(g_trace_store, &n);
+ *num_active_ops = (int)n;
+ if (n != 0) {
+ size_t i = 0;
+ ret = gpr_malloc(sizeof(census_trace_obj *) * n);
+ for (i = 0; i < n; i++) {
+ ret[i] = trace_obj_dup((census_trace_obj *)all_kvs[i].v);
+ }
+ }
+ gpr_free(all_kvs);
+ }
+ gpr_mu_unlock(&g_mu);
+ return ret;
+}
diff --git a/src/core/ext/census/census_tracing.h b/src/core/ext/census/census_tracing.h
new file mode 100644
index 0000000000..42a0d7403e
--- /dev/null
+++ b/src/core/ext/census/census_tracing.h
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_CENSUS_TRACING_H
+#define GRPC_CORE_EXT_CENSUS_CENSUS_TRACING_H
+
+#include <grpc/support/time.h>
+#include "src/core/ext/census/census_rpc_stats.h"
+
+/* WARNING: The data structures and APIs provided by this file are for GRPC
+ library's internal use ONLY. They might be changed in backward-incompatible
+ ways and are not subject to any deprecation policy.
+ They are not recommended for external use.
+ */
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Struct for a trace annotation. */
+typedef struct census_trace_annotation {
+ gpr_timespec ts; /* timestamp of the annotation */
+ char txt[CENSUS_MAX_ANNOTATION_LENGTH + 1]; /* actual txt annotation */
+ struct census_trace_annotation *next;
+} census_trace_annotation;
+
+typedef struct census_trace_obj {
+ census_op_id id;
+ gpr_timespec ts;
+ census_rpc_stats rpc_stats;
+ char *method;
+ census_trace_annotation *annotations;
+} census_trace_obj;
+
+/* Deletes trace object. */
+void census_trace_obj_destroy(census_trace_obj *obj);
+
+/* Initializes trace store. This function is thread safe. */
+void census_tracing_init(void);
+
+/* Shutsdown trace store. This function is thread safe. */
+void census_tracing_shutdown(void);
+
+/* Gets trace obj corresponding to the input op_id. Returns NULL if trace store
+ is not initialized or trace obj is not found. Requires trace store being
+ locked before calling this function. */
+census_trace_obj *census_get_trace_obj_locked(census_op_id op_id);
+
+/* The following two functions acquire and release the trace store global lock.
+ They are for census internal use only. */
+void census_internal_lock_trace_store(void);
+void census_internal_unlock_trace_store(void);
+
+/* Gets method name associated with the input trace object. */
+const char *census_get_trace_method_name(const census_trace_obj *trace);
+
+/* Returns an array of pointers to trace objects of currently active operations
+ and fills in number of active operations. Returns NULL if there are no active
+ operations.
+ Caller owns the returned objects. */
+census_trace_obj **census_get_active_ops(int *num_active_ops);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* GRPC_CORE_EXT_CENSUS_CENSUS_TRACING_H */
diff --git a/src/core/ext/census/context.c b/src/core/ext/census/context.c
new file mode 100644
index 0000000000..0dfc4ecbf1
--- /dev/null
+++ b/src/core/ext/census/context.c
@@ -0,0 +1,509 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/useful.h>
+#include <stdbool.h>
+#include <string.h>
+#include "src/core/lib/support/string.h"
+
+// Functions in this file support the public context API, including
+// encoding/decoding as part of context propagation across RPC's. The overall
+// requirements (in approximate priority order) for the
+// context representation:
+// 1. Efficient conversion to/from wire format
+// 2. Minimal bytes used on-wire
+// 3. Efficient context creation
+// 4. Efficient lookup of tag value for a key
+// 5. Efficient iteration over tags
+// 6. Minimal memory footprint
+//
+// Notes on tradeoffs/decisions:
+// * tag includes 1 byte length of key, as well as nil-terminating byte. These
+// are to aid in efficient parsing and the ability to directly return key
+// strings. This is more important than saving a single byte/tag on the wire.
+// * The wire encoding uses only single byte values. This eliminates the need
+// to handle endian-ness conversions. It also means there is a hard upper
+// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS.
+// * Keep all tag information (keys/values/flags) in a single memory buffer,
+// that can be directly copied to the wire.
+
+// min and max valid chars in tag keys and values. All printable ASCII is OK.
+#define MIN_VALID_TAG_CHAR 32 // ' '
+#define MAX_VALID_TAG_CHAR 126 // '~'
+
+// Structure representing a set of tags. Essentially a count of number of tags
+// present, and pointer to a chunk of memory that contains the per-tag details.
+struct tag_set {
+ int ntags; // number of tags.
+ int ntags_alloc; // ntags + number of deleted tags (total number of tags
+ // in all of kvm). This will always be == ntags, except during the process
+ // of building a new tag set.
+ size_t kvm_size; // number of bytes allocated for key/value storage.
+ size_t kvm_used; // number of bytes of used key/value memory
+ char *kvm; // key/value memory. Consists of repeated entries of:
+ // Offset Size Description
+ // 0 1 Key length, including trailing 0. (K)
+ // 1 1 Value length, including trailing 0 (V)
+ // 2 1 Flags
+ // 3 K Key bytes
+ // 3 + K V Value bytes
+ //
+ // We refer to the first 3 entries as the 'tag header'. If extra values are
+ // introduced in the header, you will need to modify the TAG_HEADER_SIZE
+ // constant, the raw_tag structure (and everything that uses it) and the
+ // encode/decode functions appropriately.
+};
+
+// Number of bytes in tag header.
+#define TAG_HEADER_SIZE 3 // key length (1) + value length (1) + flags (1)
+// Offsets to tag header entries.
+#define KEY_LEN_OFFSET 0
+#define VALUE_LEN_OFFSET 1
+#define FLAG_OFFSET 2
+
+// raw_tag represents the raw-storage form of a tag in the kvm of a tag_set.
+struct raw_tag {
+ uint8_t key_len;
+ uint8_t value_len;
+ uint8_t flags;
+ char *key;
+ char *value;
+};
+
+// Use a reserved flag bit for indication of deleted tag.
+#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED
+#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED)
+
+// Primary representation of a context. Composed of 2 underlying tag_set
+// structs, one each for propagated and local (non-propagated) tags. This is
+// to efficiently support tag encoding/decoding.
+// TODO(aveitch): need to add tracing id's/structure.
+struct census_context {
+ struct tag_set tags[2];
+ census_context_status status;
+};
+
+// Indices into the tags member of census_context
+#define PROPAGATED_TAGS 0
+#define LOCAL_TAGS 1
+
+// Validate (check all characters are in range and size is less than limit) a
+// key or value string. Returns 0 if the string is invalid, or the length
+// (including terminator) if valid.
+static size_t validate_tag(const char *kv) {
+ size_t len = 1;
+ char ch;
+ while ((ch = *kv++) != 0) {
+ if (ch < MIN_VALID_TAG_CHAR || ch > MAX_VALID_TAG_CHAR) {
+ return 0;
+ }
+ len++;
+ }
+ if (len > CENSUS_MAX_TAG_KV_LEN) {
+ return 0;
+ }
+ return len;
+}
+
+// Extract a raw tag given a pointer (raw) to the tag header. Allow for some
+// extra bytes in the tag header (see encode/decode functions for usage: this
+// allows for future expansion of the tag header).
+static char *decode_tag(struct raw_tag *tag, char *header, int offset) {
+ tag->key_len = (uint8_t)(*header++);
+ tag->value_len = (uint8_t)(*header++);
+ tag->flags = (uint8_t)(*header++);
+ header += offset;
+ tag->key = header;
+ header += tag->key_len;
+ tag->value = header;
+ return header + tag->value_len;
+}
+
+// Make a copy (in 'to') of an existing tag_set.
+static void tag_set_copy(struct tag_set *to, const struct tag_set *from) {
+ memcpy(to, from, sizeof(struct tag_set));
+ to->kvm = gpr_malloc(to->kvm_size);
+ memcpy(to->kvm, from->kvm, from->kvm_used);
+}
+
+// Delete a tag from a tag_set, if it exists (returns true if it did).
+static bool tag_set_delete_tag(struct tag_set *tags, const char *key,
+ size_t key_len) {
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags_alloc; i++) {
+ uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET);
+ struct raw_tag tag;
+ kvp = decode_tag(&tag, kvp, 0);
+ if (CENSUS_TAG_IS_DELETED(tag.flags)) continue;
+ if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) {
+ *flags |= CENSUS_TAG_DELETED;
+ tags->ntags--;
+ return true;
+ }
+ }
+ return false;
+}
+
+// Delete a tag from a context, return true if it existed.
+static bool context_delete_tag(census_context *context, const census_tag *tag,
+ size_t key_len) {
+ return (
+ tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) ||
+ tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len));
+}
+
+// Add a tag to a tag_set. Return true on success, false if the tag could
+// not be added because of constraints on tag set size. This function should
+// not be called if the tag may already exist (in a non-deleted state) in
+// the tag_set, as that would result in two tags with the same key.
+static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
+ size_t key_len, size_t value_len) {
+ if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) {
+ return false;
+ }
+ const size_t tag_size = key_len + value_len + TAG_HEADER_SIZE;
+ if (tags->kvm_used + tag_size > tags->kvm_size) {
+ // allocate new memory if needed
+ tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE;
+ char *new_kvm = gpr_malloc(tags->kvm_size);
+ memcpy(new_kvm, tags->kvm, tags->kvm_used);
+ gpr_free(tags->kvm);
+ tags->kvm = new_kvm;
+ }
+ char *kvp = tags->kvm + tags->kvm_used;
+ *kvp++ = (char)key_len;
+ *kvp++ = (char)value_len;
+ // ensure reserved flags are not used.
+ *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS));
+ memcpy(kvp, tag->key, key_len);
+ kvp += key_len;
+ memcpy(kvp, tag->value, value_len);
+ tags->kvm_used += tag_size;
+ tags->ntags++;
+ tags->ntags_alloc++;
+ return true;
+}
+
+// Add/modify/delete a tag to/in a context. Caller must validate that tag key
+// etc. are valid.
+static void context_modify_tag(census_context *context, const census_tag *tag,
+ size_t key_len, size_t value_len) {
+ // First delete the tag if it is already present.
+ bool deleted = context_delete_tag(context, tag, key_len);
+ bool added = false;
+ if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
+ added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len,
+ value_len);
+ } else {
+ added =
+ tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len, value_len);
+ }
+
+ if (deleted) {
+ context->status.n_modified_tags++;
+ } else {
+ if (added) {
+ context->status.n_added_tags++;
+ } else {
+ context->status.n_ignored_tags++;
+ }
+ }
+}
+
+// Remove memory used for deleted tags from a tag set. Basic algorithm:
+// 1) Walk through tag set to find first deleted tag. Record where it is.
+// 2) Find the next not-deleted tag. Copy all of kvm from there to the end
+// "over" the deleted tags
+// 3) repeat #1 and #2 until we have seen all tags
+// 4) if we are still looking for a not-deleted tag, then all the end portion
+// of the kvm is deleted. Just reduce the used amount of memory by the
+// appropriate amount.
+static void tag_set_flatten(struct tag_set *tags) {
+ if (tags->ntags == tags->ntags_alloc) return;
+ bool found_deleted = false; // found a deleted tag.
+ char *kvp = tags->kvm;
+ char *dbase = NULL; // record location of deleted tag
+ for (int i = 0; i < tags->ntags_alloc; i++) {
+ struct raw_tag tag;
+ char *next_kvp = decode_tag(&tag, kvp, 0);
+ if (found_deleted) {
+ if (!CENSUS_TAG_IS_DELETED(tag.flags)) {
+ ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags
+ GPR_ASSERT(reduce > 0);
+ ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp;
+ GPR_ASSERT(copy_size > 0);
+ memmove(dbase, kvp, (size_t)copy_size);
+ tags->kvm_used -= (size_t)reduce;
+ next_kvp -= reduce;
+ found_deleted = false;
+ }
+ } else {
+ if (CENSUS_TAG_IS_DELETED(tag.flags)) {
+ dbase = kvp;
+ found_deleted = true;
+ }
+ }
+ kvp = next_kvp;
+ }
+ if (found_deleted) {
+ GPR_ASSERT(dbase > tags->kvm);
+ tags->kvm_used = (size_t)(dbase - tags->kvm);
+ }
+ tags->ntags_alloc = tags->ntags;
+}
+
+census_context *census_context_create(const census_context *base,
+ const census_tag *tags, int ntags,
+ census_context_status const **status) {
+ census_context *context = gpr_malloc(sizeof(census_context));
+ // If we are given a base, copy it into our new tag set. Otherwise set it
+ // to zero/NULL everything.
+ if (base == NULL) {
+ memset(context, 0, sizeof(census_context));
+ } else {
+ tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]);
+ tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]);
+ memset(&context->status, 0, sizeof(context->status));
+ }
+ // Walk over the additional tags and, for those that aren't invalid, modify
+ // the context to add/replace/delete as required.
+ for (int i = 0; i < ntags; i++) {
+ const census_tag *tag = &tags[i];
+ size_t key_len = validate_tag(tag->key);
+ // ignore the tag if it is invalid or too short.
+ if (key_len <= 1) {
+ context->status.n_invalid_tags++;
+ } else {
+ if (tag->value != NULL) {
+ size_t value_len = validate_tag(tag->value);
+ if (value_len != 0) {
+ context_modify_tag(context, tag, key_len, value_len);
+ } else {
+ context->status.n_invalid_tags++;
+ }
+ } else {
+ if (context_delete_tag(context, tag, key_len)) {
+ context->status.n_deleted_tags++;
+ }
+ }
+ }
+ }
+ // Remove any deleted tags, update status if needed, and return.
+ tag_set_flatten(&context->tags[PROPAGATED_TAGS]);
+ tag_set_flatten(&context->tags[LOCAL_TAGS]);
+ context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags;
+ context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags;
+ if (status) {
+ *status = &context->status;
+ }
+ return context;
+}
+
+const census_context_status *census_context_get_status(
+ const census_context *context) {
+ return &context->status;
+}
+
+void census_context_destroy(census_context *context) {
+ gpr_free(context->tags[PROPAGATED_TAGS].kvm);
+ gpr_free(context->tags[LOCAL_TAGS].kvm);
+ gpr_free(context);
+}
+
+void census_context_initialize_iterator(const census_context *context,
+ census_context_iterator *iterator) {
+ iterator->context = context;
+ iterator->index = 0;
+ if (context->tags[PROPAGATED_TAGS].ntags != 0) {
+ iterator->base = PROPAGATED_TAGS;
+ iterator->kvm = context->tags[PROPAGATED_TAGS].kvm;
+ } else if (context->tags[LOCAL_TAGS].ntags != 0) {
+ iterator->base = LOCAL_TAGS;
+ iterator->kvm = context->tags[LOCAL_TAGS].kvm;
+ } else {
+ iterator->base = -1;
+ }
+}
+
+int census_context_next_tag(census_context_iterator *iterator,
+ census_tag *tag) {
+ if (iterator->base < 0) {
+ return 0;
+ }
+ struct raw_tag raw;
+ iterator->kvm = decode_tag(&raw, iterator->kvm, 0);
+ tag->key = raw.key;
+ tag->value = raw.value;
+ tag->flags = raw.flags;
+ if (++iterator->index == iterator->context->tags[iterator->base].ntags) {
+ do {
+ if (iterator->base == LOCAL_TAGS) {
+ iterator->base = -1;
+ return 1;
+ }
+ } while (iterator->context->tags[++iterator->base].ntags == 0);
+ iterator->index = 0;
+ iterator->kvm = iterator->context->tags[iterator->base].kvm;
+ }
+ return 1;
+}
+
+// Find a tag in a tag_set by key. Return true if found, false otherwise.
+static bool tag_set_get_tag(const struct tag_set *tags, const char *key,
+ size_t key_len, census_tag *tag) {
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags; i++) {
+ struct raw_tag raw;
+ kvp = decode_tag(&raw, kvp, 0);
+ if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) {
+ tag->key = raw.key;
+ tag->value = raw.value;
+ tag->flags = raw.flags;
+ return true;
+ }
+ }
+ return false;
+}
+
+int census_context_get_tag(const census_context *context, const char *key,
+ census_tag *tag) {
+ size_t key_len = strlen(key) + 1;
+ if (key_len == 1) {
+ return 0;
+ }
+ if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) ||
+ tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) {
+ return 1;
+ }
+ return 0;
+}
+
+// Context encoding and decoding functions.
+//
+// Wire format for tag_set's on the wire:
+//
+// First, a tag set header:
+//
+// offset bytes description
+// 0 1 version number
+// 1 1 number of bytes in this header. This allows for future
+// expansion.
+// 2 1 number of bytes in each tag header.
+// 3 1 ntags value from tag set.
+//
+// This is followed by the key/value memory from struct tag_set.
+
+#define ENCODED_VERSION 0 // Version number
+#define ENCODED_HEADER_SIZE 4 // size of tag set header
+
+// Encode a tag set. Returns 0 if buffer is too small.
+static size_t tag_set_encode(const struct tag_set *tags, char *buffer,
+ size_t buf_size) {
+ if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) {
+ return 0;
+ }
+ buf_size -= ENCODED_HEADER_SIZE;
+ *buffer++ = (char)ENCODED_VERSION;
+ *buffer++ = (char)ENCODED_HEADER_SIZE;
+ *buffer++ = (char)TAG_HEADER_SIZE;
+ *buffer++ = (char)tags->ntags;
+ if (tags->ntags == 0) {
+ return ENCODED_HEADER_SIZE;
+ }
+ memcpy(buffer, tags->kvm, tags->kvm_used);
+ return ENCODED_HEADER_SIZE + tags->kvm_used;
+}
+
+size_t census_context_encode(const census_context *context, char *buffer,
+ size_t buf_size) {
+ return tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size);
+}
+
+// Decode a tag set.
+static void tag_set_decode(struct tag_set *tags, const char *buffer,
+ size_t size) {
+ uint8_t version = (uint8_t)(*buffer++);
+ uint8_t header_size = (uint8_t)(*buffer++);
+ uint8_t tag_header_size = (uint8_t)(*buffer++);
+ tags->ntags = tags->ntags_alloc = (int)(*buffer++);
+ if (tags->ntags == 0) {
+ tags->ntags_alloc = 0;
+ tags->kvm_size = 0;
+ tags->kvm_used = 0;
+ tags->kvm = NULL;
+ return;
+ }
+ if (header_size != ENCODED_HEADER_SIZE) {
+ GPR_ASSERT(version != ENCODED_VERSION);
+ GPR_ASSERT(ENCODED_HEADER_SIZE < header_size);
+ buffer += (header_size - ENCODED_HEADER_SIZE);
+ }
+ tags->kvm_used = size - header_size;
+ tags->kvm_size = tags->kvm_used + CENSUS_MAX_TAG_KV_LEN;
+ tags->kvm = gpr_malloc(tags->kvm_size);
+ if (tag_header_size != TAG_HEADER_SIZE) {
+ // something new in the tag information. I don't understand it, so
+ // don't copy it over.
+ GPR_ASSERT(version != ENCODED_VERSION);
+ GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE);
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags; i++) {
+ memcpy(kvp, buffer, TAG_HEADER_SIZE);
+ kvp += header_size;
+ struct raw_tag raw;
+ buffer =
+ decode_tag(&raw, (char *)buffer, tag_header_size - TAG_HEADER_SIZE);
+ memcpy(kvp, raw.key, (size_t)raw.key_len + raw.value_len);
+ kvp += raw.key_len + raw.value_len;
+ }
+ } else {
+ memcpy(tags->kvm, buffer, tags->kvm_used);
+ }
+}
+
+census_context *census_context_decode(const char *buffer, size_t size) {
+ census_context *context = gpr_malloc(sizeof(census_context));
+ memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set));
+ if (buffer == NULL) {
+ memset(&context->tags[PROPAGATED_TAGS], 0, sizeof(struct tag_set));
+ } else {
+ tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size);
+ }
+ memset(&context->status, 0, sizeof(context->status));
+ context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags;
+ return context;
+}
diff --git a/src/core/ext/census/grpc_context.c b/src/core/ext/census/grpc_context.c
new file mode 100644
index 0000000000..98285ab2d5
--- /dev/null
+++ b/src/core/ext/census/grpc_context.c
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+#include <grpc/grpc.h>
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/call.h"
+
+void grpc_census_call_set_context(grpc_call *call, census_context *context) {
+ GRPC_API_TRACE("grpc_census_call_set_context(call=%p, census_context=%p)", 2,
+ (call, context));
+ if (census_enabled() == CENSUS_FEATURE_NONE) {
+ return;
+ }
+ if (context != NULL) {
+ grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context, NULL);
+ }
+}
+
+census_context *grpc_census_call_get_context(grpc_call *call) {
+ GRPC_API_TRACE("grpc_census_call_get_context(call=%p)", 1, (call));
+ return (census_context *)grpc_call_context_get(call, GRPC_CONTEXT_TRACING);
+}
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
new file mode 100644
index 0000000000..abfb3bb5f0
--- /dev/null
+++ b/src/core/ext/census/grpc_filter.c
@@ -0,0 +1,198 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/grpc_filter.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/census.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice.h>
+#include <grpc/support/time.h>
+
+#include "src/core/ext/census/census_interface.h"
+#include "src/core/ext/census/census_rpc_stats.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+typedef struct call_data {
+ census_op_id op_id;
+ census_context *ctxt;
+ gpr_timespec start_ts;
+ int error;
+
+ /* recv callback */
+ grpc_metadata_batch *recv_initial_metadata;
+ grpc_closure *on_done_recv;
+ grpc_closure finish_recv;
+} call_data;
+
+typedef struct channel_data { uint8_t unused; } channel_data;
+
+static void extract_and_annotate_method_tag(grpc_metadata_batch *md,
+ call_data *calld,
+ channel_data *chand) {
+ grpc_linked_mdelem *m;
+ for (m = md->list.head; m != NULL; m = m->next) {
+ if (m->md->key == GRPC_MDSTR_PATH) {
+ gpr_log(GPR_DEBUG, "%s",
+ (const char *)GPR_SLICE_START_PTR(m->md->value->slice));
+ /* Add method tag here */
+ }
+ }
+}
+
+static void client_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ if (op->send_initial_metadata) {
+ extract_and_annotate_method_tag(op->send_initial_metadata, calld, chand);
+ }
+}
+
+static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ client_mutate_op(elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
+ bool success) {
+ grpc_call_element *elem = ptr;
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ if (success) {
+ extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand);
+ }
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
+}
+
+static void server_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ call_data *calld = elem->call_data;
+ if (op->recv_initial_metadata) {
+ /* substitute our callback for the op callback */
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->finish_recv;
+ }
+}
+
+static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ /* TODO(ctiller): this code fails. I don't know why. I expect it's
+ incomplete, and someone should look at it soon.
+
+ call_data *calld = elem->call_data;
+ GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); */
+ server_mutate_op(elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ call_data *d = elem->call_data;
+ GPR_ASSERT(d != NULL);
+ memset(d, 0, sizeof(*d));
+ d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
+}
+
+static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *d = elem->call_data;
+ GPR_ASSERT(d != NULL);
+ /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
+}
+
+static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ call_data *d = elem->call_data;
+ GPR_ASSERT(d != NULL);
+ memset(d, 0, sizeof(*d));
+ d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
+ /* TODO(hongyu): call census_tracing_start_op here. */
+ grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
+}
+
+static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *d = elem->call_data;
+ GPR_ASSERT(d != NULL);
+ /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
+}
+
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(chand != NULL);
+}
+
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(chand != NULL);
+}
+
+const grpc_channel_filter grpc_client_census_filter = {
+ client_start_transport_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ client_init_call_elem,
+ grpc_call_stack_ignore_set_pollset,
+ client_destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "census-client"};
+
+const grpc_channel_filter grpc_server_census_filter = {
+ server_start_transport_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ server_init_call_elem,
+ grpc_call_stack_ignore_set_pollset,
+ server_destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "census-server"};
diff --git a/src/core/ext/census/grpc_filter.h b/src/core/ext/census/grpc_filter.h
new file mode 100644
index 0000000000..a39bd82224
--- /dev/null
+++ b/src/core/ext/census/grpc_filter.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_GRPC_FILTER_H
+#define GRPC_CORE_EXT_CENSUS_GRPC_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+/* Census filters: provides tracing and stats collection functionalities. It
+ needs to reside right below the surface filter in the channel stack. */
+extern const grpc_channel_filter grpc_client_census_filter;
+extern const grpc_channel_filter grpc_server_census_filter;
+
+#endif /* GRPC_CORE_EXT_CENSUS_GRPC_FILTER_H */
diff --git a/src/core/ext/census/grpc_plugin.c b/src/core/ext/census/grpc_plugin.c
new file mode 100644
index 0000000000..0f15ecb2c2
--- /dev/null
+++ b/src/core/ext/census/grpc_plugin.c
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <limits.h>
+
+#include <grpc/census.h>
+
+#include "src/core/ext/census/grpc_filter.h"
+#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/surface/channel_init.h"
+
+static bool maybe_add_census_filter(grpc_channel_stack_builder *builder,
+ void *arg_must_be_null) {
+ const grpc_channel_args *args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ if (grpc_channel_args_is_census_enabled(args)) {
+ return grpc_channel_stack_builder_prepend_filter(
+ builder, &grpc_client_census_filter, NULL, NULL);
+ }
+ return true;
+}
+
+void census_grpc_plugin_init(void) {
+ /* Only initialize census if no one else has and some features are
+ * available. */
+ if (census_enabled() == CENSUS_FEATURE_NONE &&
+ census_supported() != CENSUS_FEATURE_NONE) {
+ if (census_initialize(census_supported())) { /* enable all features. */
+ gpr_log(GPR_ERROR, "Could not initialize census.");
+ }
+ }
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
+ maybe_add_census_filter, NULL);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ maybe_add_census_filter, NULL);
+}
+
+void census_grpc_plugin_shutdown(void) { census_shutdown(); }
diff --git a/src/core/ext/census/hash_table.c b/src/core/ext/census/hash_table.c
new file mode 100644
index 0000000000..ee6fdfc6e8
--- /dev/null
+++ b/src/core/ext/census/hash_table.c
@@ -0,0 +1,303 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/hash_table.h"
+
+#include <stddef.h>
+#include <stdio.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+#define CENSUS_HT_NUM_BUCKETS 1999
+
+/* A single hash table data entry */
+typedef struct ht_entry {
+ census_ht_key key;
+ void *data;
+ struct ht_entry *next;
+} ht_entry;
+
+/* hash table bucket */
+typedef struct bucket {
+ /* NULL if bucket is empty */
+ ht_entry *next;
+ /* -1 if all buckets are empty. */
+ int32_t prev_non_empty_bucket;
+ /* -1 if all buckets are empty. */
+ int32_t next_non_empty_bucket;
+} bucket;
+
+struct unresizable_hash_table {
+ /* Number of entries in the table */
+ size_t size;
+ /* Number of buckets */
+ uint32_t num_buckets;
+ /* Array of buckets initialized at creation time. Memory consumption is
+ 16 bytes per bucket on a 64-bit platform. */
+ bucket *buckets;
+ /* Index of the first non-empty bucket. -1 iff size == 0. */
+ int32_t first_non_empty_bucket;
+ /* Index of the last non_empty bucket. -1 iff size == 0. */
+ int32_t last_non_empty_bucket;
+ /* Immutable options of this hash table, initialized at creation time. */
+ census_ht_option options;
+};
+
+typedef struct entry_locator {
+ int32_t bucket_idx;
+ int is_first_in_chain;
+ int found;
+ ht_entry *prev_entry;
+} entry_locator;
+
+/* Asserts if option is not valid. */
+void check_options(const census_ht_option *option) {
+ GPR_ASSERT(option != NULL);
+ GPR_ASSERT(option->num_buckets > 0);
+ GPR_ASSERT(option->key_type == CENSUS_HT_UINT64 ||
+ option->key_type == CENSUS_HT_POINTER);
+ if (option->key_type == CENSUS_HT_UINT64) {
+ GPR_ASSERT(option->hash == NULL);
+ } else if (option->key_type == CENSUS_HT_POINTER) {
+ GPR_ASSERT(option->hash != NULL);
+ GPR_ASSERT(option->compare_keys != NULL);
+ }
+}
+
+#define REMOVE_NEXT(options, ptr) \
+ do { \
+ ht_entry *tmp = (ptr)->next; \
+ (ptr)->next = tmp->next; \
+ delete_entry(options, tmp); \
+ } while (0)
+
+static void delete_entry(const census_ht_option *opt, ht_entry *p) {
+ if (opt->delete_data != NULL) {
+ opt->delete_data(p->data);
+ }
+ if (opt->delete_key != NULL) {
+ opt->delete_key(p->key.ptr);
+ }
+ gpr_free(p);
+}
+
+static uint64_t hash(const census_ht_option *opt, census_ht_key key) {
+ return opt->key_type == CENSUS_HT_UINT64 ? key.val : opt->hash(key.ptr);
+}
+
+census_ht *census_ht_create(const census_ht_option *option) {
+ int i;
+ census_ht *ret = NULL;
+ check_options(option);
+ ret = (census_ht *)gpr_malloc(sizeof(census_ht));
+ ret->size = 0;
+ ret->num_buckets = option->num_buckets;
+ ret->buckets = (bucket *)gpr_malloc(sizeof(bucket) * ret->num_buckets);
+ ret->options = *option;
+ /* initialize each bucket */
+ for (i = 0; i < ret->options.num_buckets; i++) {
+ ret->buckets[i].prev_non_empty_bucket = -1;
+ ret->buckets[i].next_non_empty_bucket = -1;
+ ret->buckets[i].next = NULL;
+ }
+ return ret;
+}
+
+static int32_t find_bucket_idx(const census_ht *ht, census_ht_key key) {
+ return hash(&ht->options, key) % ht->num_buckets;
+}
+
+static int keys_match(const census_ht_option *opt, const ht_entry *p,
+ const census_ht_key key) {
+ GPR_ASSERT(opt->key_type == CENSUS_HT_UINT64 ||
+ opt->key_type == CENSUS_HT_POINTER);
+ if (opt->key_type == CENSUS_HT_UINT64) return p->key.val == key.val;
+ return !opt->compare_keys((p->key).ptr, key.ptr);
+}
+
+static entry_locator ht_find(const census_ht *ht, census_ht_key key) {
+ entry_locator loc = {0, 0, 0, NULL};
+ int32_t idx = 0;
+ ht_entry *ptr = NULL;
+ GPR_ASSERT(ht != NULL);
+ idx = find_bucket_idx(ht, key);
+ ptr = ht->buckets[idx].next;
+ if (ptr == NULL) {
+ /* bucket is empty */
+ return loc;
+ }
+ if (keys_match(&ht->options, ptr, key)) {
+ loc.bucket_idx = idx;
+ loc.is_first_in_chain = 1;
+ loc.found = 1;
+ return loc;
+ } else {
+ for (; ptr->next != NULL; ptr = ptr->next) {
+ if (keys_match(&ht->options, ptr->next, key)) {
+ loc.bucket_idx = idx;
+ loc.is_first_in_chain = 0;
+ loc.found = 1;
+ loc.prev_entry = ptr;
+ return loc;
+ }
+ }
+ }
+ /* Could not find the key */
+ return loc;
+}
+
+void *census_ht_find(const census_ht *ht, census_ht_key key) {
+ entry_locator loc = ht_find(ht, key);
+ if (loc.found == 0) {
+ return NULL;
+ }
+ return loc.is_first_in_chain ? ht->buckets[loc.bucket_idx].next->data
+ : loc.prev_entry->next->data;
+}
+
+void census_ht_insert(census_ht *ht, census_ht_key key, void *data) {
+ int32_t idx = find_bucket_idx(ht, key);
+ ht_entry *ptr = NULL;
+ entry_locator loc = ht_find(ht, key);
+ if (loc.found) {
+ /* Replace old value with new value. */
+ ptr = loc.is_first_in_chain ? ht->buckets[loc.bucket_idx].next
+ : loc.prev_entry->next;
+ if (ht->options.delete_data != NULL) {
+ ht->options.delete_data(ptr->data);
+ }
+ ptr->data = data;
+ return;
+ }
+
+ /* first entry in the table. */
+ if (ht->size == 0) {
+ ht->buckets[idx].next_non_empty_bucket = -1;
+ ht->buckets[idx].prev_non_empty_bucket = -1;
+ ht->first_non_empty_bucket = idx;
+ ht->last_non_empty_bucket = idx;
+ } else if (ht->buckets[idx].next == NULL) {
+ /* first entry in the bucket. */
+ ht->buckets[ht->last_non_empty_bucket].next_non_empty_bucket = idx;
+ ht->buckets[idx].prev_non_empty_bucket = ht->last_non_empty_bucket;
+ ht->buckets[idx].next_non_empty_bucket = -1;
+ ht->last_non_empty_bucket = idx;
+ }
+ ptr = (ht_entry *)gpr_malloc(sizeof(ht_entry));
+ ptr->key = key;
+ ptr->data = data;
+ ptr->next = ht->buckets[idx].next;
+ ht->buckets[idx].next = ptr;
+ ht->size++;
+}
+
+void census_ht_erase(census_ht *ht, census_ht_key key) {
+ entry_locator loc = ht_find(ht, key);
+ if (loc.found == 0) {
+ /* noop if not found */
+ return;
+ }
+ ht->size--;
+ if (loc.is_first_in_chain) {
+ bucket *b = &ht->buckets[loc.bucket_idx];
+ GPR_ASSERT(b->next != NULL);
+ /* The only entry in the bucket */
+ if (b->next->next == NULL) {
+ int prev = b->prev_non_empty_bucket;
+ int next = b->next_non_empty_bucket;
+ if (prev != -1) {
+ ht->buckets[prev].next_non_empty_bucket = next;
+ } else {
+ ht->first_non_empty_bucket = next;
+ }
+ if (next != -1) {
+ ht->buckets[next].prev_non_empty_bucket = prev;
+ } else {
+ ht->last_non_empty_bucket = prev;
+ }
+ }
+ REMOVE_NEXT(&ht->options, b);
+ } else {
+ GPR_ASSERT(loc.prev_entry->next != NULL);
+ REMOVE_NEXT(&ht->options, loc.prev_entry);
+ }
+}
+
+/* Returns NULL if input table is empty. */
+census_ht_kv *census_ht_get_all_elements(const census_ht *ht, size_t *num) {
+ census_ht_kv *ret = NULL;
+ int i = 0;
+ int32_t idx = -1;
+ GPR_ASSERT(ht != NULL && num != NULL);
+ *num = ht->size;
+ if (*num == 0) {
+ return NULL;
+ }
+
+ ret = (census_ht_kv *)gpr_malloc(sizeof(census_ht_kv) * ht->size);
+ idx = ht->first_non_empty_bucket;
+ while (idx >= 0) {
+ ht_entry *ptr = ht->buckets[idx].next;
+ for (; ptr != NULL; ptr = ptr->next) {
+ ret[i].k = ptr->key;
+ ret[i].v = ptr->data;
+ i++;
+ }
+ idx = ht->buckets[idx].next_non_empty_bucket;
+ }
+ return ret;
+}
+
+static void ht_delete_entry_chain(const census_ht_option *options,
+ ht_entry *first) {
+ if (first == NULL) {
+ return;
+ }
+ if (first->next != NULL) {
+ ht_delete_entry_chain(options, first->next);
+ }
+ delete_entry(options, first);
+}
+
+void census_ht_destroy(census_ht *ht) {
+ unsigned i;
+ for (i = 0; i < ht->num_buckets; ++i) {
+ ht_delete_entry_chain(&ht->options, ht->buckets[i].next);
+ }
+ gpr_free(ht->buckets);
+ gpr_free(ht);
+}
+
+size_t census_ht_get_size(const census_ht *ht) { return ht->size; }
diff --git a/src/core/ext/census/hash_table.h b/src/core/ext/census/hash_table.h
new file mode 100644
index 0000000000..30ea4264a2
--- /dev/null
+++ b/src/core/ext/census/hash_table.h
@@ -0,0 +1,131 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_HASH_TABLE_H
+#define GRPC_CORE_EXT_CENSUS_HASH_TABLE_H
+
+#include <stddef.h>
+
+#include <grpc/support/port_platform.h>
+
+/* A chain based hash table with fixed number of buckets.
+ Your probably shouldn't use this code directly. It is implemented for the
+ use case in census trace store and stats store, where number of entries in
+ the table is in the scale of upto several thousands, entries are added and
+ removed from the table very frequently (~100k/s), the frequency of find()
+ operations is roughly several times of the frequency of insert() and erase()
+ Comparing to find(), the insert(), erase() and get_all_entries() operations
+ are much less freqent (<1/s).
+
+ Per bucket memory overhead is about (8 + sizeof(intptr_t) bytes.
+ Per entry memory overhead is about (8 + 2 * sizeof(intptr_t) bytes.
+
+ All functions are not thread-safe. Synchronization will be provided in the
+ upper layer (in trace store and stats store).
+*/
+
+/* Opaque hash table struct */
+typedef struct unresizable_hash_table census_ht;
+
+/* Currently, the hash_table can take two types of keys. (uint64 for trace
+ store and const char* for stats store). */
+typedef union {
+ uint64_t val;
+ void *ptr;
+} census_ht_key;
+
+typedef enum census_ht_key_type {
+ CENSUS_HT_UINT64 = 0,
+ CENSUS_HT_POINTER = 1
+} census_ht_key_type;
+
+typedef struct census_ht_option {
+ /* Type of hash key */
+ census_ht_key_type key_type;
+ /* Desired number of buckets, preferably a prime number */
+ int32_t num_buckets;
+ /* Fucntion to calculate uint64 hash value of the key. Only takes effect if
+ key_type is POINTER. */
+ uint64_t (*hash)(const void *);
+ /* Function to compare two keys, returns 0 iff equal. Only takes effect if
+ key_type is POINTER */
+ int (*compare_keys)(const void *k1, const void *k2);
+ /* Value deleter. NULL if no specialized delete function is needed. */
+ void (*delete_data)(void *);
+ /* Key deleter. NULL if table does not own the key. (e.g. key is part of the
+ value or key is not owned by the table.) */
+ void (*delete_key)(void *);
+} census_ht_option;
+
+/* Creates a hashtable with fixed number of buckets according to the settings
+ specified in 'options' arg. Function pointers "hash" and "compare_keys" must
+ be provided if key_type is POINTER. Asserts if fail to create. */
+census_ht *census_ht_create(const census_ht_option *options);
+
+/* Deletes hash table instance. Frees all dynamic memory owned by ht.*/
+void census_ht_destroy(census_ht *ht);
+
+/* Inserts the input key-val pair into hash_table. If an entry with the same key
+ exists in the table, the corresponding value will be overwritten by the input
+ val. */
+void census_ht_insert(census_ht *ht, census_ht_key key, void *val);
+
+/* Returns pointer to data, returns NULL if not found. */
+void *census_ht_find(const census_ht *ht, census_ht_key key);
+
+/* Erase hash table entry with input key. Noop if key is not found. */
+void census_ht_erase(census_ht *ht, census_ht_key key);
+
+typedef struct census_ht_kv {
+ census_ht_key k;
+ void *v;
+} census_ht_kv;
+
+/* Returns an array of pointers to all values in the hash table. Order of the
+ elements can be arbitrary. Sets 'num' to the size of returned array. Caller
+ owns returned array. */
+census_ht_kv *census_ht_get_all_elements(const census_ht *ht, size_t *num);
+
+/* Returns number of elements kept. */
+size_t census_ht_get_size(const census_ht *ht);
+
+/* Functor applied on each key-value pair while iterating through entries in the
+ table. The functor should not mutate data. */
+typedef void (*census_ht_itr_cb)(census_ht_key key, const void *val_ptr,
+ void *state);
+
+/* Iterates through all key-value pairs in the hash_table. The callback function
+ should not invalidate data entries. */
+uint64_t census_ht_for_all(const census_ht *ht, census_ht_itr_cb);
+
+#endif /* GRPC_CORE_EXT_CENSUS_HASH_TABLE_H */
diff --git a/src/core/ext/census/initialize.c b/src/core/ext/census/initialize.c
new file mode 100644
index 0000000000..896276e44a
--- /dev/null
+++ b/src/core/ext/census/initialize.c
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+
+static int features_enabled = CENSUS_FEATURE_NONE;
+
+int census_initialize(int features) {
+ if (features_enabled != CENSUS_FEATURE_NONE) {
+ // Must have been a previous call to census_initialize; return error
+ return 1;
+ }
+ features_enabled = features;
+ return 0;
+}
+
+void census_shutdown(void) { features_enabled = CENSUS_FEATURE_NONE; }
+
+int census_supported(void) {
+ /* TODO(aveitch): improve this as we implement features... */
+ return CENSUS_FEATURE_NONE;
+}
+
+int census_enabled(void) { return features_enabled; }
diff --git a/src/core/ext/census/mlog.c b/src/core/ext/census/mlog.c
new file mode 100644
index 0000000000..698b7096ab
--- /dev/null
+++ b/src/core/ext/census/mlog.c
@@ -0,0 +1,600 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Implements an efficient in-memory log, optimized for multiple writers and
+// a single reader. Available log space is divided up in blocks of
+// CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the following
+// three data structures:
+// - Free blocks (free_block_list)
+// - Blocks with unread data (dirty_block_list)
+// - Blocks currently attached to cores (core_local_blocks[])
+//
+// census_log_start_write() moves a block from core_local_blocks[] to the end of
+// dirty_block_list when block:
+// - is out-of-space OR
+// - has an incomplete record (an incomplete record occurs when a thread calls
+// census_log_start_write() and is context-switched before calling
+// census_log_end_write()
+// So, blocks in dirty_block_list are ordered, from oldest to newest, by the
+// time when block is detached from the core.
+//
+// census_log_read_next() first iterates over dirty_block_list and then
+// core_local_blocks[]. It moves completely read blocks from dirty_block_list
+// to free_block_list. Blocks in core_local_blocks[] are not freed, even when
+// completely read.
+//
+// If the log is configured to discard old records and free_block_list is empty,
+// census_log_start_write() iterates over dirty_block_list to allocate a
+// new block. It moves the oldest available block (no pending read/write) to
+// core_local_blocks[].
+//
+// core_local_block_struct is used to implement a map from core id to the block
+// associated with that core. This mapping is advisory. It is possible that the
+// block returned by this mapping is no longer associated with that core. This
+// mapping is updated, lazily, by census_log_start_write().
+//
+// Locking in block struct:
+//
+// Exclusive g_log.lock must be held before calling any functions operating on
+// block structs except census_log_start_write() and census_log_end_write().
+//
+// Writes to a block are serialized via writer_lock. census_log_start_write()
+// acquires this lock and census_log_end_write() releases it. On failure to
+// acquire the lock, writer allocates a new block for the current core and
+// updates core_local_block accordingly.
+//
+// Simultaneous read and write access is allowed. Readers can safely read up to
+// committed bytes (bytes_committed).
+//
+// reader_lock protects the block, currently being read, from getting recycled.
+// start_read() acquires reader_lock and end_read() releases the lock.
+//
+// Read/write access to a block is disabled via try_disable_access(). It returns
+// with both writer_lock and reader_lock held. These locks are subsequently
+// released by enable_access() to enable access to the block.
+//
+// A note on naming: Most function/struct names are prepended by cl_
+// (shorthand for census_log). Further, functions that manipulate structures
+// include the name of the structure, which will be passed as the first
+// argument. E.g. cl_block_initialize() will initialize a cl_block.
+
+#include "src/core/ext/census/mlog.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/cpu.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+#include <stdbool.h>
+#include <string.h>
+
+// End of platform specific code
+
+typedef struct census_log_block_list_struct {
+ struct census_log_block_list_struct* next;
+ struct census_log_block_list_struct* prev;
+ struct census_log_block* block;
+} cl_block_list_struct;
+
+typedef struct census_log_block {
+ // Pointer to underlying buffer.
+ char* buffer;
+ gpr_atm writer_lock;
+ gpr_atm reader_lock;
+ // Keeps completely written bytes. Declared atomic because accessed
+ // simultaneously by reader and writer.
+ gpr_atm bytes_committed;
+ // Bytes already read.
+ size_t bytes_read;
+ // Links for list.
+ cl_block_list_struct link;
+// We want this structure to be cacheline aligned. We assume the following
+// sizes for the various parts on 32/64bit systems:
+// type 32b size 64b size
+// char* 4 8
+// 3x gpr_atm 12 24
+// size_t 4 8
+// cl_block_list_struct 12 24
+// TOTAL 32 64
+//
+// Depending on the size of our cacheline and the architecture, we
+// selectively add char buffering to this structure. The size is checked
+// via assert in census_log_initialize().
+#if defined(GPR_ARCH_64)
+#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64)
+#else
+#if defined(GPR_ARCH_32)
+#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 32)
+#else
+#error "Unknown architecture"
+#endif
+#endif
+#if CL_BLOCK_PAD_SIZE > 0
+ char padding[CL_BLOCK_PAD_SIZE];
+#endif
+} cl_block;
+
+// A list of cl_blocks, doubly-linked through cl_block::link.
+typedef struct census_log_block_list {
+ int32_t count; // Number of items in list.
+ cl_block_list_struct ht; // head/tail of linked list.
+} cl_block_list;
+
+// Cacheline aligned block pointers to avoid false sharing. Block pointer must
+// be initialized via set_block(), before calling other functions
+typedef struct census_log_core_local_block {
+ gpr_atm block;
+// Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8
+#if defined(GPR_ARCH_64)
+#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8)
+#else
+#if defined(GPR_ARCH_32)
+#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 4)
+#else
+#error "Unknown architecture"
+#endif
+#endif
+#if CL_CORE_LOCAL_BLOCK_PAD_SIZE > 0
+ char padding[CL_CORE_LOCAL_BLOCK_PAD_SIZE];
+#endif
+} cl_core_local_block;
+
+struct census_log {
+ int discard_old_records;
+ // Number of cores (aka hardware-contexts)
+ unsigned num_cores;
+ // number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log
+ uint32_t num_blocks;
+ cl_block* blocks; // Block metadata.
+ cl_core_local_block* core_local_blocks; // Keeps core to block mappings.
+ gpr_mu lock;
+ int initialized; // has log been initialized?
+ // Keeps the state of the reader iterator. A value of 0 indicates that
+ // iterator has reached the end. census_log_init_reader() resets the value
+ // to num_core to restart iteration.
+ uint32_t read_iterator_state;
+ // Points to the block being read. If non-NULL, the block is locked for
+ // reading(block_being_read_->reader_lock is held).
+ cl_block* block_being_read;
+ char* buffer;
+ cl_block_list free_block_list;
+ cl_block_list dirty_block_list;
+ gpr_atm out_of_space_count;
+};
+
+// Single internal log.
+static struct census_log g_log;
+
+// Functions that operate on an atomic memory location used as a lock.
+
+// Returns non-zero if lock is acquired.
+static int cl_try_lock(gpr_atm* lock) { return gpr_atm_acq_cas(lock, 0, 1); }
+
+static void cl_unlock(gpr_atm* lock) { gpr_atm_rel_store(lock, 0); }
+
+// Functions that operate on cl_core_local_block's.
+
+static void cl_core_local_block_set_block(cl_core_local_block* clb,
+ cl_block* block) {
+ gpr_atm_rel_store(&clb->block, (gpr_atm)block);
+}
+
+static cl_block* cl_core_local_block_get_block(cl_core_local_block* clb) {
+ return (cl_block*)gpr_atm_acq_load(&clb->block);
+}
+
+// Functions that operate on cl_block_list_struct's.
+
+static void cl_block_list_struct_initialize(cl_block_list_struct* bls,
+ cl_block* block) {
+ bls->next = bls->prev = bls;
+ bls->block = block;
+}
+
+// Functions that operate on cl_block_list's.
+
+static void cl_block_list_initialize(cl_block_list* list) {
+ list->count = 0;
+ cl_block_list_struct_initialize(&list->ht, NULL);
+}
+
+// Returns head of *this, or NULL if empty.
+static cl_block* cl_block_list_head(cl_block_list* list) {
+ return list->ht.next->block;
+}
+
+// Insert element *e after *pos.
+static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos,
+ cl_block_list_struct* e) {
+ list->count++;
+ e->next = pos->next;
+ e->prev = pos;
+ e->next->prev = e;
+ e->prev->next = e;
+}
+
+// Insert block at the head of the list
+static void cl_block_list_insert_at_head(cl_block_list* list, cl_block* block) {
+ cl_block_list_insert(list, &list->ht, &block->link);
+}
+
+// Insert block at the tail of the list.
+static void cl_block_list_insert_at_tail(cl_block_list* list, cl_block* block) {
+ cl_block_list_insert(list, list->ht.prev, &block->link);
+}
+
+// Removes block *b. Requires *b be in the list.
+static void cl_block_list_remove(cl_block_list* list, cl_block* b) {
+ list->count--;
+ b->link.next->prev = b->link.prev;
+ b->link.prev->next = b->link.next;
+}
+
+// Functions that operate on cl_block's
+
+static void cl_block_initialize(cl_block* block, char* buffer) {
+ block->buffer = buffer;
+ gpr_atm_rel_store(&block->writer_lock, 0);
+ gpr_atm_rel_store(&block->reader_lock, 0);
+ gpr_atm_rel_store(&block->bytes_committed, 0);
+ block->bytes_read = 0;
+ cl_block_list_struct_initialize(&block->link, block);
+}
+
+// Guards against exposing partially written buffer to the reader.
+static void cl_block_set_bytes_committed(cl_block* block,
+ size_t bytes_committed) {
+ gpr_atm_rel_store(&block->bytes_committed, (gpr_atm)bytes_committed);
+}
+
+static size_t cl_block_get_bytes_committed(cl_block* block) {
+ return (size_t)gpr_atm_acq_load(&block->bytes_committed);
+}
+
+// Tries to disable future read/write access to this block. Succeeds if:
+// - no in-progress write AND
+// - no in-progress read AND
+// - 'discard_data' set to true OR no unread data
+// On success, clears the block state and returns with writer_lock_ and
+// reader_lock_ held. These locks are released by a subsequent
+// cl_block_access_enable() call.
+static bool cl_block_try_disable_access(cl_block* block, int discard_data) {
+ if (!cl_try_lock(&block->writer_lock)) {
+ return false;
+ }
+ if (!cl_try_lock(&block->reader_lock)) {
+ cl_unlock(&block->writer_lock);
+ return false;
+ }
+ if (!discard_data &&
+ (block->bytes_read != cl_block_get_bytes_committed(block))) {
+ cl_unlock(&block->reader_lock);
+ cl_unlock(&block->writer_lock);
+ return false;
+ }
+ cl_block_set_bytes_committed(block, 0);
+ block->bytes_read = 0;
+ return true;
+}
+
+static void cl_block_enable_access(cl_block* block) {
+ cl_unlock(&block->reader_lock);
+ cl_unlock(&block->writer_lock);
+}
+
+// Returns with writer_lock held.
+static void* cl_block_start_write(cl_block* block, size_t size) {
+ if (!cl_try_lock(&block->writer_lock)) {
+ return NULL;
+ }
+ size_t bytes_committed = cl_block_get_bytes_committed(block);
+ if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) {
+ cl_unlock(&block->writer_lock);
+ return NULL;
+ }
+ return block->buffer + bytes_committed;
+}
+
+// Releases writer_lock and increments committed bytes by 'bytes_written'.
+// 'bytes_written' must be <= 'size' specified in the corresponding
+// StartWrite() call. This function is thread-safe.
+static void cl_block_end_write(cl_block* block, size_t bytes_written) {
+ cl_block_set_bytes_committed(
+ block, cl_block_get_bytes_committed(block) + bytes_written);
+ cl_unlock(&block->writer_lock);
+}
+
+// Returns a pointer to the first unread byte in buffer. The number of bytes
+// available are returned in 'bytes_available'. Acquires reader lock that is
+// released by a subsequent cl_block_end_read() call. Returns NULL if:
+// - read in progress
+// - no data available
+static void* cl_block_start_read(cl_block* block, size_t* bytes_available) {
+ if (!cl_try_lock(&block->reader_lock)) {
+ return NULL;
+ }
+ // bytes_committed may change from under us. Use bytes_available to update
+ // bytes_read below.
+ size_t bytes_committed = cl_block_get_bytes_committed(block);
+ GPR_ASSERT(bytes_committed >= block->bytes_read);
+ *bytes_available = bytes_committed - block->bytes_read;
+ if (*bytes_available == 0) {
+ cl_unlock(&block->reader_lock);
+ return NULL;
+ }
+ void* record = block->buffer + block->bytes_read;
+ block->bytes_read += *bytes_available;
+ return record;
+}
+
+static void cl_block_end_read(cl_block* block) {
+ cl_unlock(&block->reader_lock);
+}
+
+// Internal functions operating on g_log
+
+// Allocates a new free block (or recycles an available dirty block if log is
+// configured to discard old records). Returns NULL if out-of-space.
+static cl_block* cl_allocate_block(void) {
+ cl_block* block = cl_block_list_head(&g_log.free_block_list);
+ if (block != NULL) {
+ cl_block_list_remove(&g_log.free_block_list, block);
+ return block;
+ }
+ if (!g_log.discard_old_records) {
+ // No free block and log is configured to keep old records.
+ return NULL;
+ }
+ // Recycle dirty block. Start from the oldest.
+ for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL;
+ block = block->link.next->block) {
+ if (cl_block_try_disable_access(block, 1 /* discard data */)) {
+ cl_block_list_remove(&g_log.dirty_block_list, block);
+ return block;
+ }
+ }
+ return NULL;
+}
+
+// Allocates a new block and updates core id => block mapping. 'old_block'
+// points to the block that the caller thinks is attached to
+// 'core_id'. 'old_block' may be NULL. Returns true if:
+// - allocated a new block OR
+// - 'core_id' => 'old_block' mapping changed (another thread allocated a
+// block before lock was acquired).
+static bool cl_allocate_core_local_block(uint32_t core_id,
+ cl_block* old_block) {
+ // Now that we have the lock, check if core-local mapping has changed.
+ cl_core_local_block* core_local_block = &g_log.core_local_blocks[core_id];
+ cl_block* block = cl_core_local_block_get_block(core_local_block);
+ if ((block != NULL) && (block != old_block)) {
+ return true;
+ }
+ if (block != NULL) {
+ cl_core_local_block_set_block(core_local_block, NULL);
+ cl_block_list_insert_at_tail(&g_log.dirty_block_list, block);
+ }
+ block = cl_allocate_block();
+ if (block == NULL) {
+ return false;
+ }
+ cl_core_local_block_set_block(core_local_block, block);
+ cl_block_enable_access(block);
+ return true;
+}
+
+static cl_block* cl_get_block(void* record) {
+ uintptr_t p = (uintptr_t)((char*)record - g_log.buffer);
+ uintptr_t index = p >> CENSUS_LOG_2_MAX_RECORD_SIZE;
+ return &g_log.blocks[index];
+}
+
+// Gets the next block to read and tries to free 'prev' block (if not NULL).
+// Returns NULL if reached the end.
+static cl_block* cl_next_block_to_read(cl_block* prev) {
+ cl_block* block = NULL;
+ if (g_log.read_iterator_state == g_log.num_cores) {
+ // We are traversing dirty list; find the next dirty block.
+ if (prev != NULL) {
+ // Try to free the previous block if there is no unread data. This
+ // block
+ // may have unread data if previously incomplete record completed
+ // between
+ // read_next() calls.
+ block = prev->link.next->block;
+ if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) {
+ cl_block_list_remove(&g_log.dirty_block_list, prev);
+ cl_block_list_insert_at_head(&g_log.free_block_list, prev);
+ }
+ } else {
+ block = cl_block_list_head(&g_log.dirty_block_list);
+ }
+ if (block != NULL) {
+ return block;
+ }
+ // We are done with the dirty list; moving on to core-local blocks.
+ }
+ while (g_log.read_iterator_state > 0) {
+ g_log.read_iterator_state--;
+ block = cl_core_local_block_get_block(
+ &g_log.core_local_blocks[g_log.read_iterator_state]);
+ if (block != NULL) {
+ return block;
+ }
+ }
+ return NULL;
+}
+
+#define CL_LOG_2_MB 20 // 2^20 = 1MB
+
+// External functions: primary stats_log interface
+void census_log_initialize(size_t size_in_mb, int discard_old_records) {
+ // Check cacheline alignment.
+ GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0);
+ GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0);
+ GPR_ASSERT(!g_log.initialized);
+ g_log.discard_old_records = discard_old_records;
+ g_log.num_cores = gpr_cpu_num_cores();
+ // Ensure that we will not get any overflow in calaculating num_blocks
+ GPR_ASSERT(CL_LOG_2_MB >= CENSUS_LOG_2_MAX_RECORD_SIZE);
+ GPR_ASSERT(size_in_mb < 1000);
+ // Ensure at least 2x as many blocks as there are cores.
+ g_log.num_blocks =
+ (uint32_t)GPR_MAX(2 * g_log.num_cores, (size_in_mb << CL_LOG_2_MB) >>
+ CENSUS_LOG_2_MAX_RECORD_SIZE);
+ gpr_mu_init(&g_log.lock);
+ g_log.read_iterator_state = 0;
+ g_log.block_being_read = NULL;
+ g_log.core_local_blocks = (cl_core_local_block*)gpr_malloc_aligned(
+ g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE_LOG);
+ memset(g_log.core_local_blocks, 0,
+ g_log.num_cores * sizeof(cl_core_local_block));
+ g_log.blocks = (cl_block*)gpr_malloc_aligned(
+ g_log.num_blocks * sizeof(cl_block), GPR_CACHELINE_SIZE_LOG);
+ memset(g_log.blocks, 0, g_log.num_blocks * sizeof(cl_block));
+ g_log.buffer = gpr_malloc(g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE);
+ memset(g_log.buffer, 0, g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE);
+ cl_block_list_initialize(&g_log.free_block_list);
+ cl_block_list_initialize(&g_log.dirty_block_list);
+ for (uint32_t i = 0; i < g_log.num_blocks; ++i) {
+ cl_block* block = g_log.blocks + i;
+ cl_block_initialize(block, g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * i));
+ cl_block_try_disable_access(block, 1 /* discard data */);
+ cl_block_list_insert_at_tail(&g_log.free_block_list, block);
+ }
+ gpr_atm_rel_store(&g_log.out_of_space_count, 0);
+ g_log.initialized = 1;
+}
+
+void census_log_shutdown(void) {
+ GPR_ASSERT(g_log.initialized);
+ gpr_mu_destroy(&g_log.lock);
+ gpr_free_aligned(g_log.core_local_blocks);
+ g_log.core_local_blocks = NULL;
+ gpr_free_aligned(g_log.blocks);
+ g_log.blocks = NULL;
+ gpr_free(g_log.buffer);
+ g_log.buffer = NULL;
+ g_log.initialized = 0;
+}
+
+void* census_log_start_write(size_t size) {
+ // Used to bound number of times block allocation is attempted.
+ GPR_ASSERT(size > 0);
+ GPR_ASSERT(g_log.initialized);
+ if (size > CENSUS_LOG_MAX_RECORD_SIZE) {
+ return NULL;
+ }
+ uint32_t attempts_remaining = g_log.num_blocks;
+ uint32_t core_id = gpr_cpu_current_cpu();
+ do {
+ void* record = NULL;
+ cl_block* block =
+ cl_core_local_block_get_block(&g_log.core_local_blocks[core_id]);
+ if (block && (record = cl_block_start_write(block, size))) {
+ return record;
+ }
+ // Need to allocate a new block. We are here if:
+ // - No block associated with the core OR
+ // - Write in-progress on the block OR
+ // - block is out of space
+ gpr_mu_lock(&g_log.lock);
+ bool allocated = cl_allocate_core_local_block(core_id, block);
+ gpr_mu_unlock(&g_log.lock);
+ if (!allocated) {
+ gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
+ return NULL;
+ }
+ } while (attempts_remaining--);
+ // Give up.
+ gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
+ return NULL;
+}
+
+void census_log_end_write(void* record, size_t bytes_written) {
+ GPR_ASSERT(g_log.initialized);
+ cl_block_end_write(cl_get_block(record), bytes_written);
+}
+
+void census_log_init_reader(void) {
+ GPR_ASSERT(g_log.initialized);
+ gpr_mu_lock(&g_log.lock);
+ // If a block is locked for reading unlock it.
+ if (g_log.block_being_read != NULL) {
+ cl_block_end_read(g_log.block_being_read);
+ g_log.block_being_read = NULL;
+ }
+ g_log.read_iterator_state = g_log.num_cores;
+ gpr_mu_unlock(&g_log.lock);
+}
+
+const void* census_log_read_next(size_t* bytes_available) {
+ GPR_ASSERT(g_log.initialized);
+ gpr_mu_lock(&g_log.lock);
+ if (g_log.block_being_read != NULL) {
+ cl_block_end_read(g_log.block_being_read);
+ }
+ do {
+ g_log.block_being_read = cl_next_block_to_read(g_log.block_being_read);
+ if (g_log.block_being_read != NULL) {
+ void* record =
+ cl_block_start_read(g_log.block_being_read, bytes_available);
+ if (record != NULL) {
+ gpr_mu_unlock(&g_log.lock);
+ return record;
+ }
+ }
+ } while (g_log.block_being_read != NULL);
+ gpr_mu_unlock(&g_log.lock);
+ return NULL;
+}
+
+size_t census_log_remaining_space(void) {
+ GPR_ASSERT(g_log.initialized);
+ size_t space = 0;
+ gpr_mu_lock(&g_log.lock);
+ if (g_log.discard_old_records) {
+ // Remaining space is not meaningful; just return the entire log space.
+ space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE;
+ } else {
+ GPR_ASSERT(g_log.free_block_list.count >= 0);
+ space = (size_t)g_log.free_block_list.count * CENSUS_LOG_MAX_RECORD_SIZE;
+ }
+ gpr_mu_unlock(&g_log.lock);
+ return space;
+}
+
+int64_t census_log_out_of_space_count(void) {
+ GPR_ASSERT(g_log.initialized);
+ return gpr_atm_acq_load(&g_log.out_of_space_count);
+}
diff --git a/src/core/ext/census/mlog.h b/src/core/ext/census/mlog.h
new file mode 100644
index 0000000000..a256426f91
--- /dev/null
+++ b/src/core/ext/census/mlog.h
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* A very fast in-memory log, optimized for multiple writers. */
+
+#ifndef GRPC_CORE_EXT_CENSUS_MLOG_H
+#define GRPC_CORE_EXT_CENSUS_MLOG_H
+
+#include <grpc/support/port_platform.h>
+#include <stddef.h>
+
+/* Maximum record size, in bytes. */
+#define CENSUS_LOG_2_MAX_RECORD_SIZE 14 /* 2^14 = 16KB */
+#define CENSUS_LOG_MAX_RECORD_SIZE (1 << CENSUS_LOG_2_MAX_RECORD_SIZE)
+
+/* Initialize the statistics logging subsystem with the given log size. A log
+ size of 0 will result in the smallest possible log for the platform
+ (approximately CENSUS_LOG_MAX_RECORD_SIZE * gpr_cpu_num_cores()). If
+ discard_old_records is non-zero, then new records will displace older ones
+ when the log is full. This function must be called before any other
+ census_log functions.
+*/
+void census_log_initialize(size_t size_in_mb, int discard_old_records);
+
+/* Shutdown the logging subsystem. Caller must ensure that:
+ - no in progress or future call to any census_log functions
+ - no incomplete records
+*/
+void census_log_shutdown(void);
+
+/* Allocates and returns a 'size' bytes record and marks it in use. A
+ subsequent census_log_end_write() marks the record complete. The
+ 'bytes_written' census_log_end_write() argument must be <=
+ 'size'. Returns NULL if out-of-space AND:
+ - log is configured to keep old records OR
+ - all blocks are pinned by incomplete records.
+*/
+void* census_log_start_write(size_t size);
+
+void census_log_end_write(void* record, size_t bytes_written);
+
+void census_log_init_reader(void);
+
+/* census_log_read_next() iterates over blocks with data and for each block
+ returns a pointer to the first unread byte. The number of bytes that can be
+ read are returned in 'bytes_available'. Reader is expected to read all
+ available data. Reading the data consumes it i.e. it cannot be read again.
+ census_log_read_next() returns NULL if the end is reached i.e last block
+ is read. census_log_init_reader() starts the iteration or aborts the
+ current iteration.
+*/
+const void* census_log_read_next(size_t* bytes_available);
+
+/* Returns estimated remaining space across all blocks, in bytes. If log is
+ configured to discard old records, returns total log space. Otherwise,
+ returns space available in empty blocks (partially filled blocks are
+ treated as full).
+*/
+size_t census_log_remaining_space(void);
+
+/* Returns the number of times gprc_stats_log_start_write() failed due to
+ out-of-space. */
+int64_t census_log_out_of_space_count(void);
+
+#endif /* GRPC_CORE_EXT_CENSUS_MLOG_H */
diff --git a/src/core/ext/census/operation.c b/src/core/ext/census/operation.c
new file mode 100644
index 0000000000..5c58704372
--- /dev/null
+++ b/src/core/ext/census/operation.c
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+
+/* TODO(aveitch): These are all placeholder implementations. */
+
+census_timestamp census_start_rpc_op_timestamp(void) {
+ census_timestamp ct;
+ /* TODO(aveitch): assumes gpr_timespec implementation of census_timestamp. */
+ ct.ts = gpr_now(GPR_CLOCK_MONOTONIC);
+ return ct;
+}
+
+census_context *census_start_client_rpc_op(
+ const census_context *context, int64_t rpc_name_id,
+ const census_rpc_name_info *rpc_name_info, const char *peer, int trace_mask,
+ const census_timestamp *start_time) {
+ return NULL;
+}
+
+census_context *census_start_server_rpc_op(
+ const char *buffer, int64_t rpc_name_id,
+ const census_rpc_name_info *rpc_name_info, const char *peer, int trace_mask,
+ census_timestamp *start_time) {
+ return NULL;
+}
+
+census_context *census_start_op(census_context *context, const char *family,
+ const char *name, int trace_mask) {
+ return NULL;
+}
+
+void census_end_op(census_context *context, int status) {}
diff --git a/src/core/ext/census/placeholders.c b/src/core/ext/census/placeholders.c
new file mode 100644
index 0000000000..fe23d13971
--- /dev/null
+++ b/src/core/ext/census/placeholders.c
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+
+#include <grpc/support/log.h>
+
+/* Placeholders for the pending APIs */
+
+int census_get_trace_record(census_trace_record *trace_record) {
+ (void)trace_record;
+ abort();
+}
+
+void census_record_values(census_context *context, census_value *values,
+ size_t nvalues) {
+ (void)context;
+ (void)values;
+ (void)nvalues;
+ abort();
+}
+
+void census_set_rpc_client_peer(census_context *context, const char *peer) {
+ (void)context;
+ (void)peer;
+ abort();
+}
+
+void census_trace_scan_end() { abort(); }
+
+int census_trace_scan_start(int consume) {
+ (void)consume;
+ abort();
+}
+
+const census_aggregation *census_view_aggregrations(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+census_view *census_view_create(uint32_t metric_id, const census_context *tags,
+ const census_aggregation *aggregations,
+ size_t naggregations) {
+ (void)metric_id;
+ (void)tags;
+ (void)aggregations;
+ (void)naggregations;
+ abort();
+}
+
+const census_context *census_view_tags(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+void census_view_delete(census_view *view) {
+ (void)view;
+ abort();
+}
+
+const census_view_data *census_view_get_data(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+size_t census_view_metric(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+size_t census_view_naggregations(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+void census_view_reset(census_view *view) {
+ (void)view;
+ abort();
+}
diff --git a/src/core/ext/census/rpc_metric_id.h b/src/core/ext/census/rpc_metric_id.h
new file mode 100644
index 0000000000..888ec500a7
--- /dev/null
+++ b/src/core/ext/census/rpc_metric_id.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_RPC_METRIC_ID_H
+#define GRPC_CORE_EXT_CENSUS_RPC_METRIC_ID_H
+
+/* Metric ID's used for RPC measurements. */
+/* Count of client requests sent. */
+#define CENSUS_METRIC_RPC_CLIENT_REQUESTS ((uint32_t)0)
+/* Count of server requests sent. */
+#define CENSUS_METRIC_RPC_SERVER_REQUESTS ((uint32_t)1)
+/* Client error counts. */
+#define CENSUS_METRIC_RPC_CLIENT_ERRORS ((uint32_t)2)
+/* Server error counts. */
+#define CENSUS_METRIC_RPC_SERVER_ERRORS ((uint32_t)3)
+/* Client side request latency. */
+#define CENSUS_METRIC_RPC_CLIENT_LATENCY ((uint32_t)4)
+/* Server side request latency. */
+#define CENSUS_METRIC_RPC_SERVER_LATENCY ((uint32_t)5)
+
+#endif /* GRPC_CORE_EXT_CENSUS_RPC_METRIC_ID_H */
diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c
new file mode 100644
index 0000000000..3b5d6dab2b
--- /dev/null
+++ b/src/core/ext/census/tracing.c
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+
+/* TODO(aveitch): These are all placeholder implementations. */
+
+int census_trace_mask(const census_context *context) {
+ return CENSUS_TRACE_MASK_NONE;
+}
+
+void census_set_trace_mask(int trace_mask) {}
+
+void census_trace_print(census_context *context, uint32_t type,
+ const char *buffer, size_t n) {}
diff --git a/src/core/ext/census/window_stats.c b/src/core/ext/census/window_stats.c
new file mode 100644
index 0000000000..5f7bd9952e
--- /dev/null
+++ b/src/core/ext/census/window_stats.c
@@ -0,0 +1,316 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/window_stats.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include <math.h>
+#include <stddef.h>
+#include <string.h>
+
+/* typedefs make typing long names easier. Use cws (for census_window_stats) */
+typedef census_window_stats_stat_info cws_stat_info;
+typedef struct census_window_stats_sum cws_sum;
+
+/* Each interval is composed of a number of buckets, which hold a count of
+ entries and a single statistic */
+typedef struct census_window_stats_bucket {
+ int64_t count;
+ void *statistic;
+} cws_bucket;
+
+/* Each interval has a set of buckets, and the variables needed to keep
+ track of their current state */
+typedef struct census_window_stats_interval_stats {
+ /* The buckets. There will be 'granularity' + 1 of these. */
+ cws_bucket *buckets;
+ /* Index of the bucket containing the smallest time interval. */
+ int bottom_bucket;
+ /* The smallest time storable in the current window. */
+ int64_t bottom;
+ /* The largest time storable in the current window + 1ns */
+ int64_t top;
+ /* The width of each bucket in ns. */
+ int64_t width;
+} cws_interval_stats;
+
+typedef struct census_window_stats {
+ /* Number of intervals. */
+ int nintervals;
+ /* Number of buckets in each interval. 'granularity' + 1. */
+ int nbuckets;
+ /* Record of stat_info. */
+ cws_stat_info stat_info;
+ /* Stats for each interval. */
+ cws_interval_stats *interval_stats;
+ /* The time the newset stat was recorded. */
+ int64_t newest_time;
+} window_stats;
+
+/* Calculate an actual bucket index from a logical index 'IDX'. Other
+ parameters supply information on the interval struct and overall stats. */
+#define BUCKET_IDX(IS, IDX, WSTATS) \
+ ((IS->bottom_bucket + (IDX)) % WSTATS->nbuckets)
+
+/* The maximum seconds value we can have in a valid timespec. More than this
+ will result in overflow in timespec_to_ns(). This works out to ~292 years.
+ TODO: consider using doubles instead of int64. */
+static int64_t max_seconds = (GPR_INT64_MAX - GPR_NS_PER_SEC) / GPR_NS_PER_SEC;
+
+static int64_t timespec_to_ns(const gpr_timespec ts) {
+ if (ts.tv_sec > max_seconds) {
+ return GPR_INT64_MAX - 1;
+ }
+ return ts.tv_sec * GPR_NS_PER_SEC + ts.tv_nsec;
+}
+
+static void cws_initialize_statistic(void *statistic,
+ const cws_stat_info *stat_info) {
+ if (stat_info->stat_initialize == NULL) {
+ memset(statistic, 0, stat_info->stat_size);
+ } else {
+ stat_info->stat_initialize(statistic);
+ }
+}
+
+/* Create and initialize a statistic */
+static void *cws_create_statistic(const cws_stat_info *stat_info) {
+ void *stat = gpr_malloc(stat_info->stat_size);
+ cws_initialize_statistic(stat, stat_info);
+ return stat;
+}
+
+window_stats *census_window_stats_create(int nintervals,
+ const gpr_timespec intervals[],
+ int granularity,
+ const cws_stat_info *stat_info) {
+ window_stats *ret;
+ int i;
+ /* validate inputs */
+ GPR_ASSERT(nintervals > 0 && granularity > 2 && intervals != NULL &&
+ stat_info != NULL);
+ for (i = 0; i < nintervals; i++) {
+ int64_t ns = timespec_to_ns(intervals[i]);
+ GPR_ASSERT(intervals[i].tv_sec >= 0 && intervals[i].tv_nsec >= 0 &&
+ intervals[i].tv_nsec < GPR_NS_PER_SEC && ns >= 100 &&
+ granularity * 10 <= ns);
+ }
+ /* Allocate and initialize relevant data structures */
+ ret = (window_stats *)gpr_malloc(sizeof(window_stats));
+ ret->nintervals = nintervals;
+ ret->nbuckets = granularity + 1;
+ ret->stat_info = *stat_info;
+ ret->interval_stats =
+ (cws_interval_stats *)gpr_malloc(nintervals * sizeof(cws_interval_stats));
+ for (i = 0; i < nintervals; i++) {
+ int64_t size_ns = timespec_to_ns(intervals[i]);
+ cws_interval_stats *is = ret->interval_stats + i;
+ cws_bucket *buckets = is->buckets =
+ (cws_bucket *)gpr_malloc(ret->nbuckets * sizeof(cws_bucket));
+ int b;
+ for (b = 0; b < ret->nbuckets; b++) {
+ buckets[b].statistic = cws_create_statistic(stat_info);
+ buckets[b].count = 0;
+ }
+ is->bottom_bucket = 0;
+ is->bottom = 0;
+ is->width = size_ns / granularity;
+ /* Check for possible overflow issues, and maximize interval size if the
+ user requested something large enough. */
+ if ((GPR_INT64_MAX - is->width) > size_ns) {
+ is->top = size_ns + is->width;
+ } else {
+ is->top = GPR_INT64_MAX;
+ is->width = GPR_INT64_MAX / (granularity + 1);
+ }
+ /* If size doesn't divide evenly, we can have a width slightly too small;
+ better to have it slightly large. */
+ if ((size_ns - (granularity + 1) * is->width) > 0) {
+ is->width += 1;
+ }
+ }
+ ret->newest_time = 0;
+ return ret;
+}
+
+/* When we try adding a measurement above the current interval range, we
+ need to "shift" the buckets sufficiently to cover the new range. */
+static void cws_shift_buckets(const window_stats *wstats,
+ cws_interval_stats *is, int64_t when_ns) {
+ int i;
+ /* number of bucket time widths to "shift" */
+ int shift;
+ /* number of buckets to clear */
+ int nclear;
+ GPR_ASSERT(when_ns >= is->top);
+ /* number of bucket time widths to "shift" */
+ shift = ((when_ns - is->top) / is->width) + 1;
+ /* number of buckets to clear - limited by actual number of buckets */
+ nclear = GPR_MIN(shift, wstats->nbuckets);
+ for (i = 0; i < nclear; i++) {
+ int b = BUCKET_IDX(is, i, wstats);
+ is->buckets[b].count = 0;
+ cws_initialize_statistic(is->buckets[b].statistic, &wstats->stat_info);
+ }
+ /* adjust top/bottom times and current bottom bucket */
+ is->bottom_bucket = BUCKET_IDX(is, shift, wstats);
+ is->top += shift * is->width;
+ is->bottom += shift * is->width;
+}
+
+void census_window_stats_add(window_stats *wstats, const gpr_timespec when,
+ const void *stat_value) {
+ int i;
+ int64_t when_ns = timespec_to_ns(when);
+ GPR_ASSERT(wstats->interval_stats != NULL);
+ for (i = 0; i < wstats->nintervals; i++) {
+ cws_interval_stats *is = wstats->interval_stats + i;
+ cws_bucket *bucket;
+ if (when_ns < is->bottom) { /* Below smallest time in interval: drop */
+ continue;
+ }
+ if (when_ns >= is->top) { /* above limit: shift buckets */
+ cws_shift_buckets(wstats, is, when_ns);
+ }
+ /* Add the stat. */
+ GPR_ASSERT(is->bottom <= when_ns && when_ns < is->top);
+ bucket = is->buckets +
+ BUCKET_IDX(is, (when_ns - is->bottom) / is->width, wstats);
+ bucket->count++;
+ wstats->stat_info.stat_add(bucket->statistic, stat_value);
+ }
+ if (when_ns > wstats->newest_time) {
+ wstats->newest_time = when_ns;
+ }
+}
+
+/* Add a specific bucket contents to an accumulating total. */
+static void cws_add_bucket_to_sum(cws_sum *sum, const cws_bucket *bucket,
+ const cws_stat_info *stat_info) {
+ sum->count += bucket->count;
+ stat_info->stat_add(sum->statistic, bucket->statistic);
+}
+
+/* Add a proportion to an accumulating sum. */
+static void cws_add_proportion_to_sum(double p, cws_sum *sum,
+ const cws_bucket *bucket,
+ const cws_stat_info *stat_info) {
+ sum->count += p * bucket->count;
+ stat_info->stat_add_proportion(p, sum->statistic, bucket->statistic);
+}
+
+void census_window_stats_get_sums(const window_stats *wstats,
+ const gpr_timespec when, cws_sum sums[]) {
+ int i;
+ int64_t when_ns = timespec_to_ns(when);
+ GPR_ASSERT(wstats->interval_stats != NULL);
+ for (i = 0; i < wstats->nintervals; i++) {
+ int when_bucket;
+ int new_bucket;
+ double last_proportion = 1.0;
+ double bottom_proportion;
+ cws_interval_stats *is = wstats->interval_stats + i;
+ cws_sum *sum = sums + i;
+ sum->count = 0;
+ cws_initialize_statistic(sum->statistic, &wstats->stat_info);
+ if (when_ns < is->bottom) {
+ continue;
+ }
+ if (when_ns >= is->top) {
+ cws_shift_buckets(wstats, is, when_ns);
+ }
+ /* Calculating the appropriate amount of which buckets to use can get
+ complicated. Essentially there are two cases:
+ 1) if the "top" bucket (new_bucket, where the newest additions to the
+ stats recorded are entered) corresponds to 'when', then we need
+ to take a proportion of it - (if when < newest_time) or the full
+ thing. We also (possibly) need to take a corresponding
+ proportion of the bottom bucket.
+ 2) Other cases, we just take a straight proportion.
+ */
+ when_bucket = (when_ns - is->bottom) / is->width;
+ new_bucket = (wstats->newest_time - is->bottom) / is->width;
+ if (new_bucket == when_bucket) {
+ int64_t bottom_bucket_time = is->bottom + when_bucket * is->width;
+ if (when_ns < wstats->newest_time) {
+ last_proportion = (double)(when_ns - bottom_bucket_time) /
+ (double)(wstats->newest_time - bottom_bucket_time);
+ bottom_proportion =
+ (double)(is->width - (when_ns - bottom_bucket_time)) / is->width;
+ } else {
+ bottom_proportion =
+ (double)(is->width - (wstats->newest_time - bottom_bucket_time)) /
+ is->width;
+ }
+ } else {
+ last_proportion =
+ (double)(when_ns + 1 - is->bottom - when_bucket * is->width) /
+ is->width;
+ bottom_proportion = 1.0 - last_proportion;
+ }
+ cws_add_proportion_to_sum(last_proportion, sum,
+ is->buckets + BUCKET_IDX(is, when_bucket, wstats),
+ &wstats->stat_info);
+ if (when_bucket != 0) { /* last bucket isn't also bottom bucket */
+ int b;
+ /* Add all of "bottom" bucket if we are looking at a subset of the
+ full interval, or a proportion if we are adding full interval. */
+ cws_add_proportion_to_sum(
+ (when_bucket == wstats->nbuckets - 1 ? bottom_proportion : 1.0), sum,
+ is->buckets + is->bottom_bucket, &wstats->stat_info);
+ /* Add all the remaining buckets (everything but top and bottom). */
+ for (b = 1; b < when_bucket; b++) {
+ cws_add_bucket_to_sum(sum, is->buckets + BUCKET_IDX(is, b, wstats),
+ &wstats->stat_info);
+ }
+ }
+ }
+}
+
+void census_window_stats_destroy(window_stats *wstats) {
+ int i;
+ GPR_ASSERT(wstats->interval_stats != NULL);
+ for (i = 0; i < wstats->nintervals; i++) {
+ int b;
+ for (b = 0; b < wstats->nbuckets; b++) {
+ gpr_free(wstats->interval_stats[i].buckets[b].statistic);
+ }
+ gpr_free(wstats->interval_stats[i].buckets);
+ }
+ gpr_free(wstats->interval_stats);
+ /* Ensure any use-after free triggers assert. */
+ wstats->interval_stats = NULL;
+ gpr_free(wstats);
+}
diff --git a/src/core/ext/census/window_stats.h b/src/core/ext/census/window_stats.h
new file mode 100644
index 0000000000..25658c9ce0
--- /dev/null
+++ b/src/core/ext/census/window_stats.h
@@ -0,0 +1,173 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_WINDOW_STATS_H
+#define GRPC_CORE_EXT_CENSUS_WINDOW_STATS_H
+
+#include <grpc/support/time.h>
+
+/* Keep rolling sums of a user-defined statistic (containing a number of
+ measurements) over a a number of time intervals ("windows"). For example,
+ you can use a window_stats object to answer questions such as
+ "Approximately how many RPCs/s did I receive over the past minute, and
+ approximately how many bytes did I send out over that period?".
+
+ The type of data to record, and the time intervals to keep are specified
+ when creating the object via a call to census_window_stats_create().
+
+ A window's interval is divided into one or more "buckets"; the interval
+ must be divisible by the number of buckets. Internally, these buckets
+ control the granularity of window_stats' measurements. Increasing the
+ number of buckets lets the object respond more quickly to changes in the
+ overall rate of data added into the object, at the cost of additional
+ memory usage.
+
+ Here's some code which keeps one minute/hour measurements for two values
+ (latency in seconds and bytes transferred), with each interval divided into
+ 4 buckets.
+
+ typedef struct my_stat {
+ double latency;
+ int bytes;
+ } my_stat;
+
+ void add_my_stat(void* base, const void* addme) {
+ my_stat* b = (my_stat*)base;
+ const my_stat* a = (const my_stat*)addme;
+ b->latency += a->latency;
+ b->bytes += a->bytes;
+ }
+
+ void add_proportion_my_stat(double p, void* base, const void* addme) {
+ (my_stat*)result->latency += p * (const my_stat*)base->latency;
+ (my_stat*)result->bytes += p * (const my_stat*)base->bytes;
+ }
+
+ #define kNumIntervals 2
+ #define kMinInterval 0
+ #define kHourInterval 1
+ #define kNumBuckets 4
+
+ const struct census_window_stats_stat_info kMyStatInfo
+ = { sizeof(my_stat), NULL, add_my_stat, add_proportion_my_stat };
+ gpr_timespec intervals[kNumIntervals] = {{60, 0}, {3600, 0}};
+ my_stat stat;
+ my_stat sums[kNumIntervals];
+ census_window_stats_sums result[kNumIntervals];
+ struct census_window_stats* stats
+ = census_window_stats_create(kNumIntervals, intervals, kNumBuckets,
+ &kMyStatInfo);
+ // Record a new event, taking 15.3ms, transferring 1784 bytes.
+ stat.latency = 0.153;
+ stat.bytes = 1784;
+ census_window_stats_add(stats, gpr_now(GPR_CLOCK_REALTIME), &stat);
+ // Get sums and print them out
+ result[kMinInterval].statistic = &sums[kMinInterval];
+ result[kHourInterval].statistic = &sums[kHourInterval];
+ census_window_stats_get_sums(stats, gpr_now(GPR_CLOCK_REALTIME), result);
+ printf("%d events/min, average time %gs, average bytes %g\n",
+ result[kMinInterval].count,
+ (my_stat*)result[kMinInterval].statistic->latency /
+ result[kMinInterval].count,
+ (my_stat*)result[kMinInterval].statistic->bytes /
+ result[kMinInterval].count
+ );
+ printf("%d events/hr, average time %gs, average bytes %g\n",
+ result[kHourInterval].count,
+ (my_stat*)result[kHourInterval].statistic->latency /
+ result[kHourInterval].count,
+ (my_stat*)result[kHourInterval].statistic->bytes /
+ result[kHourInterval].count
+ );
+*/
+
+/* Opaque structure for representing window_stats object */
+struct census_window_stats;
+
+/* Information provided by API user on the information they want to record */
+typedef struct census_window_stats_stat_info {
+ /* Number of bytes in user-defined object. */
+ size_t stat_size;
+ /* Function to initialize a user-defined statistics object. If this is set
+ * to NULL, then the object will be zero-initialized. */
+ void (*stat_initialize)(void *stat);
+ /* Function to add one user-defined statistics object ('addme') to 'base' */
+ void (*stat_add)(void *base, const void *addme);
+ /* As for previous function, but only add a proportion 'p'. This API will
+ currently only use 'p' values in the range [0,1], but other values are
+ possible in the future, and should be supported. */
+ void (*stat_add_proportion)(double p, void *base, const void *addme);
+} census_window_stats_stat_info;
+
+/* Create a new window_stats object. 'nintervals' is the number of
+ 'intervals', and must be >=1. 'granularity' is the number of buckets, with
+ a larger number using more memory, but providing greater accuracy of
+ results. 'granularity should be > 2. We also require that each interval be
+ at least 10 * 'granularity' nanoseconds in size. 'stat_info' contains
+ information about the statistic to be gathered. Intervals greater than ~192
+ years will be treated as essentially infinite in size. This function will
+ GPR_ASSERT() if the object cannot be created or any of the parameters have
+ invalid values. This function is thread-safe. */
+struct census_window_stats *census_window_stats_create(
+ int nintervals, const gpr_timespec intervals[], int granularity,
+ const census_window_stats_stat_info *stat_info);
+
+/* Add a new measurement (in 'stat_value'), as of a given time ('when').
+ This function is thread-compatible. */
+void census_window_stats_add(struct census_window_stats *wstats,
+ const gpr_timespec when, const void *stat_value);
+
+/* Structure used to record a single intervals sum for a given statistic */
+typedef struct census_window_stats_sum {
+ /* Total count of samples. Note that because some internal interpolation
+ is performed, the count of samples returned for each interval may not be an
+ integral value. */
+ double count;
+ /* Sum for statistic */
+ void *statistic;
+} census_window_stats_sums;
+
+/* Retrieve a set of all values stored in a window_stats object 'wstats'. The
+ number of 'sums' MUST be the same as the number 'nintervals' used in
+ census_window_stats_create(). This function is thread-compatible. */
+void census_window_stats_get_sums(const struct census_window_stats *wstats,
+ const gpr_timespec when,
+ struct census_window_stats_sum sums[]);
+
+/* Destroy a window_stats object. Once this function has been called, the
+ object will no longer be usable from any of the above functions (and
+ calling them will most likely result in a NULL-pointer dereference or
+ assertion failure). This function is thread-compatible. */
+void census_window_stats_destroy(struct census_window_stats *wstats);
+
+#endif /* GRPC_CORE_EXT_CENSUS_WINDOW_STATS_H */
diff --git a/src/core/ext/resolver/dns/native/README.md b/src/core/ext/resolver/dns/native/README.md
new file mode 100644
index 0000000000..695de47b9f
--- /dev/null
+++ b/src/core/ext/resolver/dns/native/README.md
@@ -0,0 +1,2 @@
+dns: scheme name resolution, using getaddrbyname
+(or other OS specific implementation)
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
new file mode 100644
index 0000000000..70d8a3fe2d
--- /dev/null
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -0,0 +1,299 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/support/backoff.h"
+#include "src/core/lib/support/string.h"
+
+#define BACKOFF_MULTIPLIER 1.6
+#define BACKOFF_JITTER 0.2
+#define BACKOFF_MIN_SECONDS 1
+#define BACKOFF_MAX_SECONDS 120
+
+typedef struct {
+ /** base class: must be first */
+ grpc_resolver base;
+ /** refcount */
+ gpr_refcount refs;
+ /** name to resolve */
+ char *name;
+ /** default port to use */
+ char *default_port;
+ /** subchannel factory */
+ grpc_subchannel_factory *subchannel_factory;
+ /** load balancing policy name */
+ char *lb_policy_name;
+
+ /** mutex guarding the rest of the state */
+ gpr_mu mu;
+ /** are we currently resolving? */
+ int resolving;
+ /** which version of resolved_config have we published? */
+ int published_version;
+ /** which version of resolved_config is current? */
+ int resolved_version;
+ /** pending next completion, or NULL */
+ grpc_closure *next_completion;
+ /** target config address for next completion */
+ grpc_client_config **target_config;
+ /** current (fully resolved) config */
+ grpc_client_config *resolved_config;
+ /** retry timer */
+ bool have_retry_timer;
+ grpc_timer retry_timer;
+ /** retry backoff state */
+ gpr_backoff backoff_state;
+} dns_resolver;
+
+static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+
+static void dns_start_resolving_locked(dns_resolver *r);
+static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ dns_resolver *r);
+
+static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_client_config **target_config,
+ grpc_closure *on_complete);
+
+static const grpc_resolver_vtable dns_resolver_vtable = {
+ dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next};
+
+static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
+ dns_resolver *r = (dns_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ if (r->have_retry_timer) {
+ grpc_timer_cancel(exec_ctx, &r->retry_timer);
+ }
+ if (r->next_completion != NULL) {
+ *r->target_config = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ r->next_completion = NULL;
+ }
+ gpr_mu_unlock(&r->mu);
+}
+
+static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ dns_resolver *r = (dns_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ if (!r->resolving) {
+ gpr_backoff_reset(&r->backoff_state);
+ dns_start_resolving_locked(r);
+ }
+ gpr_mu_unlock(&r->mu);
+}
+
+static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_client_config **target_config,
+ grpc_closure *on_complete) {
+ dns_resolver *r = (dns_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ GPR_ASSERT(!r->next_completion);
+ r->next_completion = on_complete;
+ r->target_config = target_config;
+ if (r->resolved_version == 0 && !r->resolving) {
+ gpr_backoff_reset(&r->backoff_state);
+ dns_start_resolving_locked(r);
+ } else {
+ dns_maybe_finish_next_locked(exec_ctx, r);
+ }
+ gpr_mu_unlock(&r->mu);
+}
+
+static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
+ bool success) {
+ dns_resolver *r = arg;
+
+ gpr_mu_lock(&r->mu);
+ r->have_retry_timer = false;
+ if (success) {
+ if (!r->resolving) {
+ dns_start_resolving_locked(r);
+ }
+ }
+ gpr_mu_unlock(&r->mu);
+
+ GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer");
+}
+
+static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_resolved_addresses *addresses) {
+ dns_resolver *r = arg;
+ grpc_client_config *config = NULL;
+ grpc_lb_policy *lb_policy;
+ gpr_mu_lock(&r->mu);
+ GPR_ASSERT(r->resolving);
+ r->resolving = 0;
+ if (addresses != NULL) {
+ grpc_lb_policy_args lb_policy_args;
+ config = grpc_client_config_create();
+ memset(&lb_policy_args, 0, sizeof(lb_policy_args));
+ lb_policy_args.addresses = addresses;
+ lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
+ if (lb_policy != NULL) {
+ grpc_client_config_set_lb_policy(config, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
+ }
+ grpc_resolved_addresses_destroy(addresses);
+ } else {
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
+ gpr_timespec timeout = gpr_time_sub(next_try, now);
+ gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d.%09d seconds",
+ timeout.tv_sec, timeout.tv_nsec);
+ GPR_ASSERT(!r->have_retry_timer);
+ r->have_retry_timer = true;
+ GRPC_RESOLVER_REF(&r->base, "retry-timer");
+ grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r,
+ now);
+ }
+ if (r->resolved_config) {
+ grpc_client_config_unref(exec_ctx, r->resolved_config);
+ }
+ r->resolved_config = config;
+ r->resolved_version++;
+ dns_maybe_finish_next_locked(exec_ctx, r);
+ gpr_mu_unlock(&r->mu);
+
+ GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving");
+}
+
+static void dns_start_resolving_locked(dns_resolver *r) {
+ GRPC_RESOLVER_REF(&r->base, "dns-resolving");
+ GPR_ASSERT(!r->resolving);
+ r->resolving = 1;
+ grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r);
+}
+
+static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ dns_resolver *r) {
+ if (r->next_completion != NULL &&
+ r->resolved_version != r->published_version) {
+ *r->target_config = r->resolved_config;
+ if (r->resolved_config) {
+ grpc_client_config_ref(r->resolved_config);
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ r->next_completion = NULL;
+ r->published_version = r->resolved_version;
+ }
+}
+
+static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
+ dns_resolver *r = (dns_resolver *)gr;
+ gpr_mu_destroy(&r->mu);
+ if (r->resolved_config) {
+ grpc_client_config_unref(exec_ctx, r->resolved_config);
+ }
+ grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ gpr_free(r->name);
+ gpr_free(r->default_port);
+ gpr_free(r->lb_policy_name);
+ gpr_free(r);
+}
+
+static grpc_resolver *dns_create(grpc_resolver_args *args,
+ const char *default_port,
+ const char *lb_policy_name) {
+ dns_resolver *r;
+ const char *path = args->uri->path;
+
+ if (0 != strcmp(args->uri->authority, "")) {
+ gpr_log(GPR_ERROR, "authority based dns uri's not supported");
+ return NULL;
+ }
+
+ if (path[0] == '/') ++path;
+
+ r = gpr_malloc(sizeof(dns_resolver));
+ memset(r, 0, sizeof(*r));
+ gpr_ref_init(&r->refs, 1);
+ gpr_mu_init(&r->mu);
+ grpc_resolver_init(&r->base, &dns_resolver_vtable);
+ r->name = gpr_strdup(path);
+ r->default_port = gpr_strdup(default_port);
+ r->subchannel_factory = args->subchannel_factory;
+ gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
+ BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
+ grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->lb_policy_name = gpr_strdup(lb_policy_name);
+ return &r->base;
+}
+
+/*
+ * FACTORY
+ */
+
+static void dns_factory_ref(grpc_resolver_factory *factory) {}
+
+static void dns_factory_unref(grpc_resolver_factory *factory) {}
+
+static grpc_resolver *dns_factory_create_resolver(
+ grpc_resolver_factory *factory, grpc_resolver_args *args) {
+ return dns_create(args, "https", "pick_first");
+}
+
+static char *dns_factory_get_default_host_name(grpc_resolver_factory *factory,
+ grpc_uri *uri) {
+ const char *path = uri->path;
+ if (path[0] == '/') ++path;
+ return gpr_strdup(path);
+}
+
+static const grpc_resolver_factory_vtable dns_factory_vtable = {
+ dns_factory_ref, dns_factory_unref, dns_factory_create_resolver,
+ dns_factory_get_default_host_name, "dns"};
+static grpc_resolver_factory dns_resolver_factory = {&dns_factory_vtable};
+
+static grpc_resolver_factory *dns_resolver_factory_create() {
+ return &dns_resolver_factory;
+}
+
+void grpc_resolver_dns_native_init(void) {
+ grpc_register_resolver_type(dns_resolver_factory_create());
+}
+
+void grpc_resolver_dns_native_shutdown(void) {}
diff --git a/src/core/ext/resolver/sockaddr/README.md b/src/core/ext/resolver/sockaddr/README.md
new file mode 100644
index 0000000000..e307ba88f5
--- /dev/null
+++ b/src/core/ext/resolver/sockaddr/README.md
@@ -0,0 +1 @@
+Support for resolving ipv4:, ipv6:, unix: schemes
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
new file mode 100644
index 0000000000..fed9d20392
--- /dev/null
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -0,0 +1,363 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/unix_sockets_posix.h"
+#include "src/core/lib/support/string.h"
+
+typedef struct {
+ /** base class: must be first */
+ grpc_resolver base;
+ /** refcount */
+ gpr_refcount refs;
+ /** subchannel factory */
+ grpc_subchannel_factory *subchannel_factory;
+ /** load balancing policy name */
+ char *lb_policy_name;
+
+ /** the addresses that we've 'resolved' */
+ grpc_resolved_addresses *addresses;
+
+ /** mutex guarding the rest of the state */
+ gpr_mu mu;
+ /** have we published? */
+ int published;
+ /** pending next completion, or NULL */
+ grpc_closure *next_completion;
+ /** target config address for next completion */
+ grpc_client_config **target_config;
+} sockaddr_resolver;
+
+static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+
+static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ sockaddr_resolver *r);
+
+static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *r);
+static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_client_config **target_config,
+ grpc_closure *on_complete);
+
+static const grpc_resolver_vtable sockaddr_resolver_vtable = {
+ sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error,
+ sockaddr_next};
+
+static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ sockaddr_resolver *r = (sockaddr_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ if (r->next_completion != NULL) {
+ *r->target_config = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ r->next_completion = NULL;
+ }
+ gpr_mu_unlock(&r->mu);
+}
+
+static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ sockaddr_resolver *r = (sockaddr_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ r->published = 0;
+ sockaddr_maybe_finish_next_locked(exec_ctx, r);
+ gpr_mu_unlock(&r->mu);
+}
+
+static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_client_config **target_config,
+ grpc_closure *on_complete) {
+ sockaddr_resolver *r = (sockaddr_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ GPR_ASSERT(!r->next_completion);
+ r->next_completion = on_complete;
+ r->target_config = target_config;
+ sockaddr_maybe_finish_next_locked(exec_ctx, r);
+ gpr_mu_unlock(&r->mu);
+}
+
+static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ sockaddr_resolver *r) {
+ if (r->next_completion != NULL && !r->published) {
+ grpc_client_config *cfg = grpc_client_config_create();
+ grpc_lb_policy_args lb_policy_args;
+ memset(&lb_policy_args, 0, sizeof(lb_policy_args));
+ lb_policy_args.addresses = r->addresses;
+ lb_policy_args.subchannel_factory = r->subchannel_factory;
+ grpc_lb_policy *lb_policy =
+ grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
+ grpc_client_config_set_lb_policy(cfg, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
+ r->published = 1;
+ *r->target_config = cfg;
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ r->next_completion = NULL;
+ }
+}
+
+static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
+ sockaddr_resolver *r = (sockaddr_resolver *)gr;
+ gpr_mu_destroy(&r->mu);
+ grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_resolved_addresses_destroy(r->addresses);
+ gpr_free(r->lb_policy_name);
+ gpr_free(r);
+}
+
+static char *ip_get_default_authority(grpc_uri *uri) {
+ const char *path = uri->path;
+ if (path[0] == '/') ++path;
+ return gpr_strdup(path);
+}
+
+static char *ipv4_get_default_authority(grpc_resolver_factory *factory,
+ grpc_uri *uri) {
+ return ip_get_default_authority(uri);
+}
+
+static char *ipv6_get_default_authority(grpc_resolver_factory *factory,
+ grpc_uri *uri) {
+ return ip_get_default_authority(uri);
+}
+
+static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr,
+ size_t *len) {
+ const char *host_port = uri->path;
+ char *host;
+ char *port;
+ int port_num;
+ int result = 0;
+ struct sockaddr_in *in = (struct sockaddr_in *)addr;
+
+ if (*host_port == '/') ++host_port;
+ if (!gpr_split_host_port(host_port, &host, &port)) {
+ return 0;
+ }
+
+ memset(in, 0, sizeof(*in));
+ *len = sizeof(*in);
+ in->sin_family = AF_INET;
+ if (inet_pton(AF_INET, host, &in->sin_addr) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
+ goto done;
+ }
+
+ if (port != NULL) {
+ if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 ||
+ port_num > 65535) {
+ gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port);
+ goto done;
+ }
+ in->sin_port = htons((uint16_t)port_num);
+ } else {
+ gpr_log(GPR_ERROR, "no port given for ipv4 scheme");
+ goto done;
+ }
+
+ result = 1;
+done:
+ gpr_free(host);
+ gpr_free(port);
+ return result;
+}
+
+static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr,
+ size_t *len) {
+ const char *host_port = uri->path;
+ char *host;
+ char *port;
+ int port_num;
+ int result = 0;
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr;
+
+ if (*host_port == '/') ++host_port;
+ if (!gpr_split_host_port(host_port, &host, &port)) {
+ return 0;
+ }
+
+ memset(in6, 0, sizeof(*in6));
+ *len = sizeof(*in6);
+ in6->sin6_family = AF_INET6;
+ if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
+ goto done;
+ }
+
+ if (port != NULL) {
+ if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 ||
+ port_num > 65535) {
+ gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port);
+ goto done;
+ }
+ in6->sin6_port = htons((uint16_t)port_num);
+ } else {
+ gpr_log(GPR_ERROR, "no port given for ipv6 scheme");
+ goto done;
+ }
+
+ result = 1;
+done:
+ gpr_free(host);
+ gpr_free(port);
+ return result;
+}
+
+static void do_nothing(void *ignored) {}
+
+static grpc_resolver *sockaddr_create(
+ grpc_resolver_args *args, const char *default_lb_policy_name,
+ int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
+ int errors_found = 0; /* GPR_FALSE */
+ sockaddr_resolver *r;
+ gpr_slice path_slice;
+ gpr_slice_buffer path_parts;
+
+ if (0 != strcmp(args->uri->authority, "")) {
+ gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme",
+ args->uri->scheme);
+ return NULL;
+ }
+
+ r = gpr_malloc(sizeof(sockaddr_resolver));
+ memset(r, 0, sizeof(*r));
+
+ r->lb_policy_name =
+ gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy"));
+ const char *lb_enabled_qpart =
+ grpc_uri_get_query_arg(args->uri, "lb_enabled");
+ /* anything other than "0" is interpreted as true */
+ const bool lb_enabled =
+ (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0));
+
+ if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 &&
+ !lb_enabled) {
+ /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail
+ * out, as this is meant mostly for tests. */
+ gpr_log(GPR_ERROR,
+ "Requested 'grpclb' LB policy but resolved addresses don't "
+ "support load balancing.");
+ abort();
+ }
+
+ if (r->lb_policy_name == NULL) {
+ r->lb_policy_name = gpr_strdup(default_lb_policy_name);
+ }
+
+ path_slice =
+ gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing);
+ gpr_slice_buffer_init(&path_parts);
+
+ gpr_slice_split(path_slice, ",", &path_parts);
+ r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
+ r->addresses->naddrs = path_parts.count;
+ r->addresses->addrs =
+ gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs);
+
+ for (size_t i = 0; i < r->addresses->naddrs; i++) {
+ grpc_uri ith_uri = *args->uri;
+ char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
+ ith_uri.path = part_str;
+ if (!parse(&ith_uri,
+ (struct sockaddr_storage *)(&r->addresses->addrs[i].addr),
+ &r->addresses->addrs[i].len)) {
+ errors_found = 1; /* GPR_TRUE */
+ }
+ gpr_free(part_str);
+ if (errors_found) break;
+ }
+
+ gpr_slice_buffer_destroy(&path_parts);
+ gpr_slice_unref(path_slice);
+ if (errors_found) {
+ gpr_free(r->lb_policy_name);
+ grpc_resolved_addresses_destroy(r->addresses);
+ gpr_free(r);
+ return NULL;
+ }
+
+ gpr_ref_init(&r->refs, 1);
+ gpr_mu_init(&r->mu);
+ grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
+ r->subchannel_factory = args->subchannel_factory;
+ grpc_subchannel_factory_ref(r->subchannel_factory);
+
+ return &r->base;
+}
+
+/*
+ * FACTORY
+ */
+
+static void sockaddr_factory_ref(grpc_resolver_factory *factory) {}
+
+static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
+
+#define DECL_FACTORY(name, prefix) \
+ static grpc_resolver *name##_factory_create_resolver( \
+ grpc_resolver_factory *factory, grpc_resolver_args *args) { \
+ return sockaddr_create(args, "pick_first", prefix##parse_##name); \
+ } \
+ static const grpc_resolver_factory_vtable name##_factory_vtable = { \
+ sockaddr_factory_ref, sockaddr_factory_unref, \
+ name##_factory_create_resolver, prefix##name##_get_default_authority, \
+ #name}; \
+ static grpc_resolver_factory name##_resolver_factory = { \
+ &name##_factory_vtable}
+
+#ifdef GPR_HAVE_UNIX_SOCKET
+DECL_FACTORY(unix, grpc_);
+#endif
+DECL_FACTORY(ipv4, );
+DECL_FACTORY(ipv6, );
+
+void grpc_resolver_sockaddr_init(void) {
+ grpc_register_resolver_type(&ipv4_resolver_factory);
+ grpc_register_resolver_type(&ipv6_resolver_factory);
+#ifdef GPR_HAVE_UNIX_SOCKET
+ grpc_register_resolver_type(&unix_resolver_factory);
+#endif
+}
+
+void grpc_resolver_sockaddr_shutdown(void) {}
diff --git a/src/core/ext/resolver/zookeeper/README.md b/src/core/ext/resolver/zookeeper/README.md
new file mode 100644
index 0000000000..ce6f39683b
--- /dev/null
+++ b/src/core/ext/resolver/zookeeper/README.md
@@ -0,0 +1 @@
+Zookeeper based name resolver: WIP
diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
new file mode 100644
index 0000000000..5acb0940c6
--- /dev/null
+++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
@@ -0,0 +1,512 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include <grpc/grpc_zookeeper.h>
+#include <zookeeper/zookeeper.h>
+
+#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/json/json.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/api_trace.h"
+
+/** Zookeeper session expiration time in milliseconds */
+#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000
+
+typedef struct {
+ /** base class: must be first */
+ grpc_resolver base;
+ /** refcount */
+ gpr_refcount refs;
+ /** name to resolve */
+ char *name;
+ /** subchannel factory */
+ grpc_subchannel_factory *subchannel_factory;
+ /** load balancing policy name */
+ char *lb_policy_name;
+
+ /** mutex guarding the rest of the state */
+ gpr_mu mu;
+ /** are we currently resolving? */
+ int resolving;
+ /** which version of resolved_config have we published? */
+ int published_version;
+ /** which version of resolved_config is current? */
+ int resolved_version;
+ /** pending next completion, or NULL */
+ grpc_closure *next_completion;
+ /** target config address for next completion */
+ grpc_client_config **target_config;
+ /** current (fully resolved) config */
+ grpc_client_config *resolved_config;
+
+ /** zookeeper handle */
+ zhandle_t *zookeeper_handle;
+ /** zookeeper resolved addresses */
+ grpc_resolved_addresses *resolved_addrs;
+ /** total number of addresses to be resolved */
+ int resolved_total;
+ /** number of addresses resolved */
+ int resolved_num;
+} zookeeper_resolver;
+
+static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+
+static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
+static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ zookeeper_resolver *r);
+
+static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *r);
+static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_client_config **target_config,
+ grpc_closure *on_complete);
+
+static const grpc_resolver_vtable zookeeper_resolver_vtable = {
+ zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error,
+ zookeeper_next};
+
+static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ grpc_closure *call = NULL;
+ gpr_mu_lock(&r->mu);
+ if (r->next_completion != NULL) {
+ *r->target_config = NULL;
+ call = r->next_completion;
+ r->next_completion = NULL;
+ }
+ zookeeper_close(r->zookeeper_handle);
+ gpr_mu_unlock(&r->mu);
+ if (call != NULL) {
+ call->cb(exec_ctx, call->cb_arg, 1);
+ }
+}
+
+static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ if (r->resolving == 0) {
+ zookeeper_start_resolving_locked(r);
+ }
+ gpr_mu_unlock(&r->mu);
+}
+
+static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_client_config **target_config,
+ grpc_closure *on_complete) {
+ zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ GPR_ASSERT(r->next_completion == NULL);
+ r->next_completion = on_complete;
+ r->target_config = target_config;
+ if (r->resolved_version == 0 && r->resolving == 0) {
+ zookeeper_start_resolving_locked(r);
+ } else {
+ zookeeper_maybe_finish_next_locked(exec_ctx, r);
+ }
+ gpr_mu_unlock(&r->mu);
+}
+
+/** Zookeeper global watcher for connection management
+ TODO: better connection management besides logs */
+static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type,
+ int state, const char *path,
+ void *watcher_ctx) {
+ if (type == ZOO_SESSION_EVENT) {
+ if (state == ZOO_EXPIRED_SESSION_STATE) {
+ gpr_log(GPR_ERROR, "Zookeeper session expired");
+ } else if (state == ZOO_AUTH_FAILED_STATE) {
+ gpr_log(GPR_ERROR, "Zookeeper authentication failed");
+ }
+ }
+}
+
+/** Zookeeper watcher triggered by changes to watched nodes
+ Once triggered, it tries to resolve again to get updated addresses */
+static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
+ const char *path, void *watcher_ctx) {
+ if (watcher_ctx != NULL) {
+ zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx;
+ if (state == ZOO_CONNECTED_STATE) {
+ gpr_mu_lock(&r->mu);
+ if (r->resolving == 0) {
+ zookeeper_start_resolving_locked(r);
+ }
+ gpr_mu_unlock(&r->mu);
+ }
+ }
+}
+
+/** Callback function after getting all resolved addresses
+ Creates a subchannel for each address */
+static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_resolved_addresses *addresses) {
+ zookeeper_resolver *r = arg;
+ grpc_client_config *config = NULL;
+ grpc_lb_policy *lb_policy;
+
+ if (addresses != NULL) {
+ grpc_lb_policy_args lb_policy_args;
+ config = grpc_client_config_create();
+
+ lb_policy_args.addresses = addresses;
+ lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
+
+ if (lb_policy != NULL) {
+ grpc_client_config_set_lb_policy(config, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
+ }
+ grpc_resolved_addresses_destroy(addresses);
+ }
+ gpr_mu_lock(&r->mu);
+ GPR_ASSERT(r->resolving == 1);
+ r->resolving = 0;
+ if (r->resolved_config != NULL) {
+ grpc_client_config_unref(exec_ctx, r->resolved_config);
+ }
+ r->resolved_config = config;
+ r->resolved_version++;
+ zookeeper_maybe_finish_next_locked(exec_ctx, r);
+ gpr_mu_unlock(&r->mu);
+
+ GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "zookeeper-resolving");
+}
+
+/** Callback function for each DNS resolved address */
+static void zookeeper_dns_resolved(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_resolved_addresses *addresses) {
+ size_t i;
+ zookeeper_resolver *r = arg;
+ int resolve_done = 0;
+
+ gpr_mu_lock(&r->mu);
+ r->resolved_num++;
+ r->resolved_addrs->addrs =
+ gpr_realloc(r->resolved_addrs->addrs,
+ sizeof(grpc_resolved_address) *
+ (r->resolved_addrs->naddrs + addresses->naddrs));
+ for (i = 0; i < addresses->naddrs; i++) {
+ memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr,
+ addresses->addrs[i].addr, addresses->addrs[i].len);
+ r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len =
+ addresses->addrs[i].len;
+ }
+
+ r->resolved_addrs->naddrs += addresses->naddrs;
+ grpc_resolved_addresses_destroy(addresses);
+
+ /** Wait for all addresses to be resolved */
+ resolve_done = (r->resolved_num == r->resolved_total);
+ gpr_mu_unlock(&r->mu);
+ if (resolve_done) {
+ zookeeper_on_resolved(exec_ctx, r, r->resolved_addrs);
+ }
+}
+
+/** Parses JSON format address of a zookeeper node */
+static char *zookeeper_parse_address(const char *value, size_t value_len) {
+ grpc_json *json;
+ grpc_json *cur;
+ const char *host;
+ const char *port;
+ char *buffer;
+ char *address = NULL;
+
+ buffer = gpr_malloc(value_len);
+ memcpy(buffer, value, value_len);
+ json = grpc_json_parse_string_with_len(buffer, value_len);
+ if (json != NULL) {
+ host = NULL;
+ port = NULL;
+ for (cur = json->child; cur != NULL; cur = cur->next) {
+ if (!strcmp(cur->key, "host")) {
+ host = cur->value;
+ if (port != NULL) {
+ break;
+ }
+ } else if (!strcmp(cur->key, "port")) {
+ port = cur->value;
+ if (host != NULL) {
+ break;
+ }
+ }
+ }
+ if (host != NULL && port != NULL) {
+ gpr_asprintf(&address, "%s:%s", host, port);
+ }
+ grpc_json_destroy(json);
+ }
+ gpr_free(buffer);
+
+ return address;
+}
+
+static void zookeeper_get_children_node_completion(int rc, const char *value,
+ int value_len,
+ const struct Stat *stat,
+ const void *arg) {
+ char *address = NULL;
+ zookeeper_resolver *r = (zookeeper_resolver *)arg;
+ int resolve_done = 0;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ if (rc != 0) {
+ gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return;
+ }
+
+ address = zookeeper_parse_address(value, (size_t)value_len);
+ if (address != NULL) {
+ /** Further resolves address by DNS */
+ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
+ gpr_free(address);
+ } else {
+ gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name);
+ gpr_mu_lock(&r->mu);
+ r->resolved_total--;
+ resolve_done = (r->resolved_num == r->resolved_total);
+ gpr_mu_unlock(&r->mu);
+ if (resolve_done) {
+ zookeeper_on_resolved(&exec_ctx, r, r->resolved_addrs);
+ }
+ }
+
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void zookeeper_get_children_completion(
+ int rc, const struct String_vector *children, const void *arg) {
+ char *path;
+ int status;
+ int i;
+ zookeeper_resolver *r = (zookeeper_resolver *)arg;
+
+ if (rc != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
+ return;
+ }
+
+ if (children->count == 0) {
+ gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name);
+ return;
+ }
+
+ r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
+ r->resolved_addrs->addrs = NULL;
+ r->resolved_addrs->naddrs = 0;
+ r->resolved_total = children->count;
+
+ /** TODO: Replace expensive heap allocation with stack
+ if we can get maximum length of zookeeper path */
+ for (i = 0; i < children->count; i++) {
+ gpr_asprintf(&path, "%s/%s", r->name, children->data[i]);
+ status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r,
+ zookeeper_get_children_node_completion, r);
+ gpr_free(path);
+ if (status != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
+ }
+ }
+}
+
+static void zookeeper_get_node_completion(int rc, const char *value,
+ int value_len,
+ const struct Stat *stat,
+ const void *arg) {
+ int status;
+ char *address = NULL;
+ zookeeper_resolver *r = (zookeeper_resolver *)arg;
+ r->resolved_addrs = NULL;
+ r->resolved_total = 0;
+ r->resolved_num = 0;
+
+ if (rc != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
+ return;
+ }
+
+ /** If zookeeper node of path r->name does not have address
+ (i.e. service node), get its children */
+ address = zookeeper_parse_address(value, (size_t)value_len);
+ if (address != NULL) {
+ r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
+ r->resolved_addrs->addrs = NULL;
+ r->resolved_addrs->naddrs = 0;
+ r->resolved_total = 1;
+ /** Further resolves address by DNS */
+ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
+ gpr_free(address);
+ return;
+ }
+
+ status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher,
+ r, zookeeper_get_children_completion, r);
+ if (status != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
+ }
+}
+
+static void zookeeper_resolve_address(zookeeper_resolver *r) {
+ int status;
+ status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r,
+ zookeeper_get_node_completion, r);
+ if (status != 0) {
+ gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
+ }
+}
+
+static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
+ GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving");
+ GPR_ASSERT(r->resolving == 0);
+ r->resolving = 1;
+ zookeeper_resolve_address(r);
+}
+
+static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ zookeeper_resolver *r) {
+ if (r->next_completion != NULL &&
+ r->resolved_version != r->published_version) {
+ *r->target_config = r->resolved_config;
+ if (r->resolved_config != NULL) {
+ grpc_client_config_ref(r->resolved_config);
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ r->next_completion = NULL;
+ r->published_version = r->resolved_version;
+ }
+}
+
+static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
+ zookeeper_resolver *r = (zookeeper_resolver *)gr;
+ gpr_mu_destroy(&r->mu);
+ if (r->resolved_config != NULL) {
+ grpc_client_config_unref(exec_ctx, r->resolved_config);
+ }
+ grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ gpr_free(r->name);
+ gpr_free(r->lb_policy_name);
+ gpr_free(r);
+}
+
+static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
+ const char *lb_policy_name) {
+ zookeeper_resolver *r;
+ size_t length;
+ char *path = args->uri->path;
+
+ if (0 == strcmp(args->uri->authority, "")) {
+ gpr_log(GPR_ERROR, "No authority specified in zookeeper uri");
+ return NULL;
+ }
+
+ /** Removes the trailing slash if exists */
+ length = strlen(path);
+ if (length > 1 && path[length - 1] == '/') {
+ path[length - 1] = 0;
+ }
+
+ r = gpr_malloc(sizeof(zookeeper_resolver));
+ memset(r, 0, sizeof(*r));
+ gpr_ref_init(&r->refs, 1);
+ gpr_mu_init(&r->mu);
+ grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
+ r->name = gpr_strdup(path);
+
+ r->subchannel_factory = args->subchannel_factory;
+ grpc_subchannel_factory_ref(r->subchannel_factory);
+
+ r->lb_policy_name = gpr_strdup(lb_policy_name);
+
+ /** Initializes zookeeper client */
+ zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
+ r->zookeeper_handle =
+ zookeeper_init(args->uri->authority, zookeeper_global_watcher,
+ GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
+ if (r->zookeeper_handle == NULL) {
+ gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
+ return NULL;
+ }
+
+ return &r->base;
+}
+
+/*
+ * FACTORY
+ */
+
+static void zookeeper_factory_ref(grpc_resolver_factory *factory) {}
+
+static void zookeeper_factory_unref(grpc_resolver_factory *factory) {}
+
+static char *zookeeper_factory_get_default_hostname(
+ grpc_resolver_factory *factory, grpc_uri *uri) {
+ return NULL;
+}
+
+static grpc_resolver *zookeeper_factory_create_resolver(
+ grpc_resolver_factory *factory, grpc_resolver_args *args) {
+ return zookeeper_create(args, "pick_first");
+}
+
+static const grpc_resolver_factory_vtable zookeeper_factory_vtable = {
+ zookeeper_factory_ref, zookeeper_factory_unref,
+ zookeeper_factory_create_resolver, zookeeper_factory_get_default_hostname,
+ "zookeeper"};
+
+static grpc_resolver_factory zookeeper_resolver_factory = {
+ &zookeeper_factory_vtable};
+
+static grpc_resolver_factory *zookeeper_resolver_factory_create() {
+ return &zookeeper_resolver_factory;
+}
+
+static void zookeeper_plugin_init() {
+ grpc_register_resolver_type(zookeeper_resolver_factory_create());
+}
+
+void grpc_zookeeper_register() {
+ GRPC_API_TRACE("grpc_zookeeper_register(void)", 0, ());
+ grpc_register_plugin(zookeeper_plugin_init, NULL);
+}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index a5f7c6cfbc..606fff5fb4 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -40,8 +40,8 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
+#include "src/core/ext/census/grpc_filter.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
-#include "src/core/lib/census/grpc_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/client_channel.h"
#include "src/core/lib/channel/compress_filter.h"