aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/iomgr/closure.c8
-rw-r--r--src/core/lib/iomgr/closure.h5
-rw-r--r--src/core/lib/iomgr/resource_quota.c32
-rw-r--r--src/core/lib/iomgr/resource_quota.h6
-rw-r--r--src/core/lib/support/log_posix.c12
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/transport/bdp_estimator.c104
-rw-r--r--src/core/lib/transport/bdp_estimator.h76
-rw-r--r--src/core/lib/transport/pid_controller.c36
-rw-r--r--src/core/lib/transport/pid_controller.h17
10 files changed, 277 insertions, 21 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 8e4efd3370..509c1ff95d 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -51,20 +51,22 @@ void grpc_closure_list_init(grpc_closure_list *closure_list) {
closure_list->head = closure_list->tail = NULL;
}
-void grpc_closure_list_append(grpc_closure_list *closure_list,
+bool grpc_closure_list_append(grpc_closure_list *closure_list,
grpc_closure *closure, grpc_error *error) {
if (closure == NULL) {
GRPC_ERROR_UNREF(error);
- return;
+ return false;
}
closure->error_data.error = error;
closure->next_data.next = NULL;
- if (closure_list->head == NULL) {
+ bool was_empty = (closure_list->head == NULL);
+ if (was_empty) {
closure_list->head = closure;
} else {
closure_list->tail->next_data.next = closure;
}
closure_list->tail = closure;
+ return was_empty;
}
void grpc_closure_list_fail_all(grpc_closure_list *list,
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 6748b21e59..2510d50b42 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -116,8 +116,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
void grpc_closure_list_init(grpc_closure_list *list);
/** add \a closure to the end of \a list
- and set \a closure's result to \a error */
-void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
+ and set \a closure's result to \a error
+ Returns true if \a list becomes non-empty */
+bool grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
grpc_error *error);
/** force all success bits in \a list to false */
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index d5995a5ac6..2cc979467f 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -33,6 +33,8 @@
#include "src/core/lib/iomgr/resource_quota.h"
+#include <limits.h>
+#include <stdint.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -44,6 +46,8 @@
int grpc_resource_quota_trace = 0;
+#define MEMORY_USAGE_ESTIMATION_MAX 65536
+
/* Internal linked list pointers for a resource user */
typedef struct {
grpc_resource_user *next;
@@ -126,9 +130,12 @@ struct grpc_resource_quota {
/* refcount */
gpr_refcount refs;
+ /* estimate of current memory usage
+ scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
+ gpr_atm memory_usage_estimation;
+
/* Master combiner lock: all activity on a quota executes under this combiner
- * (so no mutex is needed for this data structure)
- */
+ * (so no mutex is needed for this data structure) */
grpc_combiner *combiner;
/* Size of the resource quota */
int64_t size;
@@ -269,6 +276,16 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE);
}
+/* update the atomically available resource estimate - use no barriers since
+ timeliness of delivery really doesn't matter much */
+static void rq_update_estimate(grpc_resource_quota *resource_quota) {
+ gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
+ (gpr_atm)((1.0 -
+ ((double)resource_quota->free_pool) /
+ ((double)resource_quota->size)) *
+ MEMORY_USAGE_ESTIMATION_MAX));
+}
+
/* returns true if all allocations are completed */
static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
@@ -281,6 +298,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
int64_t amt = -resource_user->free_pool;
resource_user->free_pool = 0;
resource_quota->free_pool -= amt;
+ rq_update_estimate(resource_quota);
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64
" bytes; rq_free_pool -> %" PRId64,
@@ -315,6 +333,7 @@ static bool rq_reclaim_from_per_user_free_pool(
int64_t amt = resource_user->free_pool;
resource_user->free_pool = 0;
resource_quota->free_pool += amt;
+ rq_update_estimate(resource_quota);
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
" bytes; rq_free_pool -> %" PRId64,
@@ -531,6 +550,7 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
int64_t delta = a->size - a->resource_quota->size;
a->resource_quota->size += delta;
a->resource_quota->free_pool += delta;
+ rq_update_estimate(a->resource_quota);
rq_step_sched(exec_ctx, a->resource_quota);
grpc_resource_quota_unref_internal(exec_ctx, a->resource_quota);
gpr_free(a);
@@ -557,6 +577,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
resource_quota->size = INT64_MAX;
resource_quota->step_scheduled = false;
resource_quota->reclaiming = false;
+ gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
if (name != NULL) {
resource_quota->name = gpr_strdup(name);
} else {
@@ -602,6 +623,13 @@ void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) {
grpc_resource_quota_ref_internal(resource_quota);
}
+double grpc_resource_quota_get_memory_pressure(
+ grpc_resource_quota *resource_quota) {
+ return ((double)(gpr_atm_no_barrier_load(
+ &resource_quota->memory_usage_estimation))) /
+ ((double)MEMORY_USAGE_ESTIMATION_MAX);
+}
+
/* Public API */
void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
size_t size) {
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index d1127ce9ea..b9f62cbf83 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -84,6 +84,12 @@ void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *grpc_resource_quota_from_channel_args(
const grpc_channel_args *channel_args);
+/* Return a number indicating current memory pressure:
+ 0.0 ==> no memory usage
+ 1.0 ==> maximum memory usage */
+double grpc_resource_quota_get_memory_pressure(
+ grpc_resource_quota *resource_quota);
+
typedef struct grpc_resource_user grpc_resource_user;
grpc_resource_user *grpc_resource_user_create(
diff --git a/src/core/lib/support/log_posix.c b/src/core/lib/support/log_posix.c
index f972da0887..79458dd7a3 100644
--- a/src/core/lib/support/log_posix.c
+++ b/src/core/lib/support/log_posix.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <pthread.h>
#include <stdarg.h>
@@ -93,10 +94,13 @@ void gpr_default_log(gpr_log_func_args *args) {
strcpy(time_buffer, "error:strftime");
}
- fprintf(stderr, "%s%s.%09d %7tu %s:%d] %s\n",
- gpr_log_severity_string(args->severity), time_buffer,
- (int)(now.tv_nsec), gettid(), display_file, args->line,
- args->message);
+ char *prefix;
+ gpr_asprintf(&prefix, "%s%s.%09d %7tu %s:%d]",
+ gpr_log_severity_string(args->severity), time_buffer,
+ (int)(now.tv_nsec), gettid(), display_file, args->line);
+
+ fprintf(stderr, "%-70s %s\n", prefix, args->message);
+ gpr_free(prefix);
}
#endif /* defined(GPR_POSIX_LOG) */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 787e4d0dd2..b338ac4c48 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -63,6 +63,7 @@
#include "src/core/lib/surface/init.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/server.h"
+#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -192,6 +193,7 @@ void grpc_init(void) {
grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
grpc_register_tracer("combiner", &grpc_combiner_trace);
grpc_register_tracer("server_channel", &grpc_server_channel_trace);
+ grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace);
// Default pluck trace to 1
grpc_cq_pluck_trace = 1;
grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace);
diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c
new file mode 100644
index 0000000000..e1483677fd
--- /dev/null
+++ b/src/core/lib/transport/bdp_estimator.c
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/transport/bdp_estimator.h"
+
+#include <stdlib.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+int grpc_bdp_estimator_trace = 0;
+
+void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
+ estimator->estimate = 65536;
+ estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
+ estimator->name = name;
+}
+
+bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
+ int64_t *estimate) {
+ *estimate = estimator->estimate;
+ return true;
+}
+
+bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
+ int64_t num_bytes) {
+ estimator->accumulator += num_bytes;
+ switch (estimator->ping_state) {
+ case GRPC_BDP_PING_UNSCHEDULED:
+ return true;
+ case GRPC_BDP_PING_SCHEDULED:
+ return false;
+ case GRPC_BDP_PING_STARTED:
+ return false;
+ }
+ GPR_UNREACHABLE_CODE(return false);
+}
+
+void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) {
+ if (grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64,
+ estimator->name, estimator->accumulator, estimator->estimate);
+ }
+ GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED);
+ estimator->ping_state = GRPC_BDP_PING_SCHEDULED;
+ estimator->accumulator = 0;
+}
+
+void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
+ if (grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64,
+ estimator->name, estimator->accumulator, estimator->estimate);
+ }
+ GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
+ estimator->ping_state = GRPC_BDP_PING_STARTED;
+ estimator->accumulator = 0;
+}
+
+void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) {
+ if (grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64,
+ estimator->name, estimator->accumulator, estimator->estimate);
+ }
+ GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED);
+ if (estimator->accumulator > 2 * estimator->estimate / 3) {
+ estimator->estimate *= 2;
+ if (grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64,
+ estimator->name, estimator->estimate);
+ }
+ }
+ estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
+ estimator->accumulator = 0;
+}
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
new file mode 100644
index 0000000000..bcaf899910
--- /dev/null
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
+#define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#define GRPC_BDP_SAMPLES 16
+#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3
+
+extern int grpc_bdp_estimator_trace;
+
+typedef enum {
+ GRPC_BDP_PING_UNSCHEDULED,
+ GRPC_BDP_PING_SCHEDULED,
+ GRPC_BDP_PING_STARTED
+} grpc_bdp_estimator_ping_state;
+
+typedef struct grpc_bdp_estimator {
+ grpc_bdp_estimator_ping_state ping_state;
+ int64_t accumulator;
+ int64_t estimate;
+ const char *name;
+} grpc_bdp_estimator;
+
+void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
+
+// Returns true if a reasonable estimate could be obtained
+bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
+ int64_t *estimate);
+// Returns true if the user should schedule a ping
+bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
+ int64_t num_bytes);
+// Schedule a ping: call in response to receiving a true from
+// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
+// transport (but not necessarily started)
+void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator);
+// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and once
+// the ping is on the wire
+void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator);
+// Completes a previously started ping
+void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator);
+
+#endif
diff --git a/src/core/lib/transport/pid_controller.c b/src/core/lib/transport/pid_controller.c
index 3cef225d4b..19cb1c0b36 100644
--- a/src/core/lib/transport/pid_controller.c
+++ b/src/core/lib/transport/pid_controller.c
@@ -32,26 +32,46 @@
*/
#include "src/core/lib/transport/pid_controller.h"
+#include <grpc/support/useful.h>
void grpc_pid_controller_init(grpc_pid_controller *pid_controller,
- double gain_p, double gain_i, double gain_d) {
- pid_controller->gain_p = gain_p;
- pid_controller->gain_i = gain_i;
- pid_controller->gain_d = gain_d;
+ grpc_pid_controller_args args) {
+ pid_controller->args = args;
+ pid_controller->last_control_value = args.initial_control_value;
grpc_pid_controller_reset(pid_controller);
}
void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) {
pid_controller->last_error = 0.0;
+ pid_controller->last_dc_dt = 0.0;
pid_controller->error_integral = 0.0;
}
double grpc_pid_controller_update(grpc_pid_controller *pid_controller,
double error, double dt) {
- pid_controller->error_integral += error * dt;
+ /* integrate error using the trapezoid rule */
+ pid_controller->error_integral +=
+ dt * (pid_controller->last_error + error) * 0.5;
+ pid_controller->error_integral = GPR_CLAMP(
+ pid_controller->error_integral, -pid_controller->args.integral_range,
+ pid_controller->args.integral_range);
double diff_error = (error - pid_controller->last_error) / dt;
+ /* calculate derivative of control value vs time */
+ double dc_dt = pid_controller->args.gain_p * error +
+ pid_controller->args.gain_i * pid_controller->error_integral +
+ pid_controller->args.gain_d * diff_error;
+ /* and perform trapezoidal integration */
+ double new_control_value = pid_controller->last_control_value +
+ dt * (pid_controller->last_dc_dt + dc_dt) * 0.5;
+ new_control_value =
+ GPR_CLAMP(new_control_value, pid_controller->args.min_control_value,
+ pid_controller->args.max_control_value);
pid_controller->last_error = error;
- return dt * (pid_controller->gain_p * error +
- pid_controller->gain_i * pid_controller->error_integral +
- pid_controller->gain_d * diff_error);
+ pid_controller->last_dc_dt = dc_dt;
+ pid_controller->last_control_value = new_control_value;
+ return new_control_value;
+}
+
+double grpc_pid_controller_last(grpc_pid_controller *pid_controller) {
+ return pid_controller->last_control_value;
}
diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h
index 83c82d6471..0a86521e90 100644
--- a/src/core/lib/transport/pid_controller.h
+++ b/src/core/lib/transport/pid_controller.h
@@ -45,20 +45,33 @@ typedef struct {
double gain_p;
double gain_i;
double gain_d;
+ double initial_control_value;
+ double min_control_value;
+ double max_control_value;
+ double integral_range;
+} grpc_pid_controller_args;
+
+typedef struct {
double last_error;
double error_integral;
+ double last_control_value;
+ double last_dc_dt;
+ grpc_pid_controller_args args;
} grpc_pid_controller;
/** Initialize the controller */
void grpc_pid_controller_init(grpc_pid_controller *pid_controller,
- double gain_p, double gain_i, double gain_d);
+ grpc_pid_controller_args args);
/** Reset the controller: useful when things have changed significantly */
void grpc_pid_controller_reset(grpc_pid_controller *pid_controller);
/** Update the controller: given a current error estimate, and the time since
- the last update, returns a delta to the control value */
+ the last update, returns a new control value */
double grpc_pid_controller_update(grpc_pid_controller *pid_controller,
double error, double dt);
+/** Returns the last control value calculated */
+double grpc_pid_controller_last(grpc_pid_controller *pid_controller);
+
#endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */