aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/statistics/window_stats.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/statistics/window_stats.c')
-rw-r--r--src/core/statistics/window_stats.c317
1 files changed, 317 insertions, 0 deletions
diff --git a/src/core/statistics/window_stats.c b/src/core/statistics/window_stats.c
new file mode 100644
index 0000000000..be53d818a0
--- /dev/null
+++ b/src/core/statistics/window_stats.c
@@ -0,0 +1,317 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/statistics/window_stats.h"
+#include <math.h>
+#include <stddef.h>
+#include <string.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+
+/* typedefs make typing long names easier. Use cws (for census_window_stats) */
+typedef census_window_stats_stat_info cws_stat_info;
+typedef struct census_window_stats_sum cws_sum;
+
+/* Each interval is composed of a number of buckets, which hold a count of
+ entries and a single statistic */
+typedef struct census_window_stats_bucket {
+ gpr_int64 count;
+ void* statistic;
+} cws_bucket;
+
+/* Each interval has a set of buckets, and the variables needed to keep
+ track of their current state */
+typedef struct census_window_stats_interval_stats {
+ /* The buckets. There will be 'granularity' + 1 of these. */
+ cws_bucket* buckets;
+ /* Index of the bucket containing the smallest time interval. */
+ int bottom_bucket;
+ /* The smallest time storable in the current window. */
+ gpr_int64 bottom;
+ /* The largest time storable in the current window + 1ns */
+ gpr_int64 top;
+ /* The width of each bucket in ns. */
+ gpr_int64 width;
+} cws_interval_stats;
+
+typedef struct census_window_stats {
+ /* Number of intervals. */
+ int nintervals;
+ /* Number of buckets in each interval. 'granularity' + 1. */
+ int nbuckets;
+ /* Record of stat_info. */
+ cws_stat_info stat_info;
+ /* Stats for each interval. */
+ cws_interval_stats* interval_stats;
+ /* The time the newset stat was recorded. */
+ gpr_int64 newest_time;
+} window_stats;
+
+/* Calculate an actual bucket index from a logical index 'IDX'. Other
+ parameters supply information on the interval struct and overall stats. */
+#define BUCKET_IDX(IS, IDX, WSTATS) \
+ ((IS->bottom_bucket + (IDX)) % WSTATS->nbuckets)
+
+/* The maximum seconds value we can have in a valid timespec. More than this
+ will result in overflow in timespec_to_ns(). This works out to ~292 years.
+ TODO: consider using doubles instead of int64. */
+static gpr_int64 max_seconds =
+ (GPR_INT64_MAX - GPR_NS_PER_SEC) / GPR_NS_PER_SEC;
+
+static gpr_int64 timespec_to_ns(const gpr_timespec ts) {
+ if (ts.tv_sec > max_seconds) {
+ return GPR_INT64_MAX - 1;
+ }
+ return (gpr_int64)ts.tv_sec * GPR_NS_PER_SEC + ts.tv_nsec;
+}
+
+static void cws_initialize_statistic(void* statistic,
+ const cws_stat_info* stat_info) {
+ if (stat_info->stat_initialize == NULL) {
+ memset(statistic, 0, stat_info->stat_size);
+ } else {
+ stat_info->stat_initialize(statistic);
+ }
+}
+
+/* Create and initialize a statistic */
+static void* cws_create_statistic(const cws_stat_info* stat_info) {
+ void* stat = gpr_malloc(stat_info->stat_size);
+ cws_initialize_statistic(stat, stat_info);
+ return stat;
+}
+
+window_stats* census_window_stats_create(int nintervals,
+ const gpr_timespec intervals[],
+ int granularity,
+ const cws_stat_info* stat_info) {
+ window_stats* ret;
+ int i;
+ /* validate inputs */
+ GPR_ASSERT(nintervals > 0 && granularity > 2 && intervals != NULL &&
+ stat_info != NULL);
+ for (i = 0; i < nintervals; i++) {
+ gpr_int64 ns = timespec_to_ns(intervals[i]);
+ GPR_ASSERT(intervals[i].tv_sec >= 0 && intervals[i].tv_nsec >= 0 &&
+ intervals[i].tv_nsec < GPR_NS_PER_SEC && ns >= 100 &&
+ granularity * 10 <= ns);
+ }
+ /* Allocate and initialize relevant data structures */
+ ret = (window_stats*)gpr_malloc(sizeof(window_stats));
+ ret->nintervals = nintervals;
+ ret->nbuckets = granularity + 1;
+ ret->stat_info = *stat_info;
+ ret->interval_stats =
+ (cws_interval_stats*)gpr_malloc(nintervals * sizeof(cws_interval_stats));
+ for (i = 0; i < nintervals; i++) {
+ gpr_int64 size_ns = timespec_to_ns(intervals[i]);
+ cws_interval_stats* is = ret->interval_stats + i;
+ cws_bucket* buckets = is->buckets =
+ (cws_bucket*)gpr_malloc(ret->nbuckets * sizeof(cws_bucket));
+ int b;
+ for (b = 0; b < ret->nbuckets; b++) {
+ buckets[b].statistic = cws_create_statistic(stat_info);
+ buckets[b].count = 0;
+ }
+ is->bottom_bucket = 0;
+ is->bottom = 0;
+ is->width = size_ns / granularity;
+ /* Check for possible overflow issues, and maximize interval size if the
+ user requested something large enough. */
+ if (GPR_INT64_MAX - is->width > size_ns) {
+ is->top = size_ns + is->width;
+ } else {
+ is->top = GPR_INT64_MAX;
+ is->width = GPR_INT64_MAX / (granularity + 1);
+ }
+ /* If size doesn't divide evenly, we can have a width slightly too small;
+ better to have it slightly large. */
+ if ((size_ns - (granularity + 1) * is->width) > 0) {
+ is->width += 1;
+ }
+ }
+ ret->newest_time = 0;
+ return ret;
+}
+
+/* When we try adding a measurement above the current interval range, we
+ need to "shift" the buckets sufficiently to cover the new range. */
+static void cws_shift_buckets(const window_stats* wstats,
+ cws_interval_stats* is, gpr_int64 when_ns) {
+ int i;
+ /* number of bucket time widths to "shift" */
+ int shift;
+ /* number of buckets to clear */
+ int nclear;
+ GPR_ASSERT(when_ns >= is->top);
+ /* number of bucket time widths to "shift" */
+ shift = ((when_ns - is->top) / is->width) + 1;
+ /* number of buckets to clear - limited by actual number of buckets */
+ nclear = GPR_MIN(shift, wstats->nbuckets);
+ for (i = 0; i < nclear; i++) {
+ int b = BUCKET_IDX(is, i, wstats);
+ is->buckets[b].count = 0;
+ cws_initialize_statistic(is->buckets[b].statistic, &wstats->stat_info);
+ }
+ /* adjust top/bottom times and current bottom bucket */
+ is->bottom_bucket = BUCKET_IDX(is, shift, wstats);
+ is->top += shift * is->width;
+ is->bottom += shift * is->width;
+}
+
+void census_window_stats_add(window_stats* wstats, const gpr_timespec when,
+ const void* stat_value) {
+ int i;
+ gpr_int64 when_ns = timespec_to_ns(when);
+ GPR_ASSERT(wstats->interval_stats != NULL);
+ for (i = 0; i < wstats->nintervals; i++) {
+ cws_interval_stats* is = wstats->interval_stats + i;
+ cws_bucket* bucket;
+ if (when_ns < is->bottom) { /* Below smallest time in interval: drop */
+ continue;
+ }
+ if (when_ns >= is->top) { /* above limit: shift buckets */
+ cws_shift_buckets(wstats, is, when_ns);
+ }
+ /* Add the stat. */
+ GPR_ASSERT(is->bottom <= when_ns && when_ns < is->top);
+ bucket = is->buckets +
+ BUCKET_IDX(is, (when_ns - is->bottom) / is->width, wstats);
+ bucket->count++;
+ wstats->stat_info.stat_add(bucket->statistic, stat_value);
+ }
+ if (when_ns > wstats->newest_time) {
+ wstats->newest_time = when_ns;
+ }
+}
+
+/* Add a specific bucket contents to an accumulating total. */
+static void cws_add_bucket_to_sum(cws_sum* sum, const cws_bucket* bucket,
+ const cws_stat_info* stat_info) {
+ sum->count += bucket->count;
+ stat_info->stat_add(sum->statistic, bucket->statistic);
+}
+
+/* Add a proportion to an accumulating sum. */
+static void cws_add_proportion_to_sum(double p, cws_sum* sum,
+ const cws_bucket* bucket,
+ const cws_stat_info* stat_info) {
+ sum->count += p * bucket->count;
+ stat_info->stat_add_proportion(p, sum->statistic, bucket->statistic);
+}
+
+void census_window_stats_get_sums(const window_stats* wstats,
+ const gpr_timespec when, cws_sum sums[]) {
+ int i;
+ gpr_int64 when_ns = timespec_to_ns(when);
+ GPR_ASSERT(wstats->interval_stats != NULL);
+ for (i = 0; i < wstats->nintervals; i++) {
+ int when_bucket;
+ int new_bucket;
+ double last_proportion = 1.0;
+ double bottom_proportion;
+ cws_interval_stats* is = wstats->interval_stats + i;
+ cws_sum* sum = sums + i;
+ sum->count = 0;
+ cws_initialize_statistic(sum->statistic, &wstats->stat_info);
+ if (when_ns < is->bottom) {
+ continue;
+ }
+ if (when_ns >= is->top) {
+ cws_shift_buckets(wstats, is, when_ns);
+ }
+ /* Calculating the appropriate amount of which buckets to use can get
+ complicated. Essentially there are two cases:
+ 1) if the "top" bucket (new_bucket, where the newest additions to the
+ stats recorded are entered) corresponds to 'when', then we need
+ to take a proportion of it - (if when < newest_time) or the full
+ thing. We also (possibly) need to take a corresponding
+ proportion of the bottom bucket.
+ 2) Other cases, we just take a straight proportion.
+ */
+ when_bucket = (when_ns - is->bottom) / is->width;
+ new_bucket = (wstats->newest_time - is->bottom) / is->width;
+ if (new_bucket == when_bucket) {
+ gpr_int64 bottom_bucket_time = is->bottom + when_bucket * is->width;
+ if (when_ns < wstats->newest_time) {
+ last_proportion = (double)(when_ns - bottom_bucket_time) /
+ (double)(wstats->newest_time - bottom_bucket_time);
+ bottom_proportion =
+ (double)(is->width - (when_ns - bottom_bucket_time)) / is->width;
+ } else {
+ bottom_proportion =
+ (double)(is->width - (wstats->newest_time - bottom_bucket_time)) /
+ is->width;
+ }
+ } else {
+ last_proportion =
+ (double)(when_ns + 1 - is->bottom - when_bucket * is->width) /
+ is->width;
+ bottom_proportion = 1.0 - last_proportion;
+ }
+ cws_add_proportion_to_sum(last_proportion, sum,
+ is->buckets + BUCKET_IDX(is, when_bucket, wstats),
+ &wstats->stat_info);
+ if (when_bucket != 0) { /* last bucket isn't also bottom bucket */
+ int b;
+ /* Add all of "bottom" bucket if we are looking at a subset of the
+ full interval, or a proportion if we are adding full interval. */
+ cws_add_proportion_to_sum(
+ (when_bucket == wstats->nbuckets - 1 ? bottom_proportion : 1.0), sum,
+ is->buckets + is->bottom_bucket, &wstats->stat_info);
+ /* Add all the remaining buckets (everything but top and bottom). */
+ for (b = 1; b < when_bucket; b++) {
+ cws_add_bucket_to_sum(sum, is->buckets + BUCKET_IDX(is, b, wstats),
+ &wstats->stat_info);
+ }
+ }
+ }
+}
+
+void census_window_stats_destroy(window_stats* wstats) {
+ int i;
+ GPR_ASSERT(wstats->interval_stats != NULL);
+ for (i = 0; i < wstats->nintervals; i++) {
+ int b;
+ for (b = 0; b < wstats->nbuckets; b++) {
+ gpr_free(wstats->interval_stats[i].buckets[b].statistic);
+ }
+ gpr_free(wstats->interval_stats[i].buckets);
+ }
+ gpr_free(wstats->interval_stats);
+ /* Ensure any use-after free triggers assert. */
+ wstats->interval_stats = NULL;
+ gpr_free(wstats);
+}