diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/channel/connected_channel.c | 6 | ||||
-rw-r--r-- | src/core/lib/channel/connected_channel.h | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.c | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.h | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.c | 32 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.c | 38 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 5 | ||||
-rw-r--r-- | src/core/lib/support/log_posix.c | 12 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 2 | ||||
-rw-r--r-- | src/core/lib/transport/bdp_estimator.c | 104 | ||||
-rw-r--r-- | src/core/lib/transport/bdp_estimator.h | 76 | ||||
-rw-r--r-- | src/core/lib/transport/pid_controller.c | 36 | ||||
-rw-r--r-- | src/core/lib/transport/pid_controller.h | 17 |
14 files changed, 326 insertions, 26 deletions
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index ccc0619e1c..068c61c92a 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -140,7 +140,7 @@ static void con_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *channel_info) {} -static const grpc_channel_filter connected_channel_filter = { +const grpc_channel_filter grpc_connected_filter = { con_start_transport_stream_op, con_start_transport_op, sizeof(call_data), @@ -158,7 +158,7 @@ static const grpc_channel_filter connected_channel_filter = { static void bind_transport(grpc_channel_stack *channel_stack, grpc_channel_element *elem, void *t) { channel_data *cd = (channel_data *)elem->channel_data; - GPR_ASSERT(elem->filter == &connected_channel_filter); + GPR_ASSERT(elem->filter == &grpc_connected_filter); GPR_ASSERT(cd->transport == NULL); cd->transport = t; @@ -178,7 +178,7 @@ bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, grpc_transport *t = grpc_channel_stack_builder_get_transport(builder); GPR_ASSERT(t != NULL); return grpc_channel_stack_builder_append_filter( - builder, &connected_channel_filter, bind_transport, t); + builder, &grpc_connected_filter, bind_transport, t); } grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) { diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h index 3585c0ecbc..5c7ea9ed26 100644 --- a/src/core/lib/channel/connected_channel.h +++ b/src/core/lib/channel/connected_channel.h @@ -36,8 +36,13 @@ #include "src/core/lib/channel/channel_stack_builder.h" +extern const grpc_channel_filter grpc_connected_filter; + bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg_must_be_null); +/* Debug helper to dig the transport stream out of a call element */ +grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem); + #endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 8e4efd3370..509c1ff95d 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -51,20 +51,22 @@ void grpc_closure_list_init(grpc_closure_list *closure_list) { closure_list->head = closure_list->tail = NULL; } -void grpc_closure_list_append(grpc_closure_list *closure_list, +bool grpc_closure_list_append(grpc_closure_list *closure_list, grpc_closure *closure, grpc_error *error) { if (closure == NULL) { GRPC_ERROR_UNREF(error); - return; + return false; } closure->error_data.error = error; closure->next_data.next = NULL; - if (closure_list->head == NULL) { + bool was_empty = (closure_list->head == NULL); + if (was_empty) { closure_list->head = closure; } else { closure_list->tail->next_data.next = closure; } closure_list->tail = closure; + return was_empty; } void grpc_closure_list_fail_all(grpc_closure_list *list, diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 6748b21e59..2510d50b42 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -116,8 +116,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg, void grpc_closure_list_init(grpc_closure_list *list); /** add \a closure to the end of \a list - and set \a closure's result to \a error */ -void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, + and set \a closure's result to \a error + Returns true if \a list becomes non-empty */ +bool grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, grpc_error *error); /** force all success bits in \a list to false */ diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index d5995a5ac6..2cc979467f 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -33,6 +33,8 @@ #include "src/core/lib/iomgr/resource_quota.h" +#include <limits.h> +#include <stdint.h> #include <string.h> #include <grpc/support/alloc.h> @@ -44,6 +46,8 @@ int grpc_resource_quota_trace = 0; +#define MEMORY_USAGE_ESTIMATION_MAX 65536 + /* Internal linked list pointers for a resource user */ typedef struct { grpc_resource_user *next; @@ -126,9 +130,12 @@ struct grpc_resource_quota { /* refcount */ gpr_refcount refs; + /* estimate of current memory usage + scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */ + gpr_atm memory_usage_estimation; + /* Master combiner lock: all activity on a quota executes under this combiner - * (so no mutex is needed for this data structure) - */ + * (so no mutex is needed for this data structure) */ grpc_combiner *combiner; /* Size of the resource quota */ int64_t size; @@ -269,6 +276,16 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); } +/* update the atomically available resource estimate - use no barriers since + timeliness of delivery really doesn't matter much */ +static void rq_update_estimate(grpc_resource_quota *resource_quota) { + gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, + (gpr_atm)((1.0 - + ((double)resource_quota->free_pool) / + ((double)resource_quota->size)) * + MEMORY_USAGE_ESTIMATION_MAX)); +} + /* returns true if all allocations are completed */ static bool rq_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { @@ -281,6 +298,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, int64_t amt = -resource_user->free_pool; resource_user->free_pool = 0; resource_quota->free_pool -= amt; + rq_update_estimate(resource_quota); if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64 " bytes; rq_free_pool -> %" PRId64, @@ -315,6 +333,7 @@ static bool rq_reclaim_from_per_user_free_pool( int64_t amt = resource_user->free_pool; resource_user->free_pool = 0; resource_quota->free_pool += amt; + rq_update_estimate(resource_quota); if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64 " bytes; rq_free_pool -> %" PRId64, @@ -531,6 +550,7 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { int64_t delta = a->size - a->resource_quota->size; a->resource_quota->size += delta; a->resource_quota->free_pool += delta; + rq_update_estimate(a->resource_quota); rq_step_sched(exec_ctx, a->resource_quota); grpc_resource_quota_unref_internal(exec_ctx, a->resource_quota); gpr_free(a); @@ -557,6 +577,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { resource_quota->size = INT64_MAX; resource_quota->step_scheduled = false; resource_quota->reclaiming = false; + gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0); if (name != NULL) { resource_quota->name = gpr_strdup(name); } else { @@ -602,6 +623,13 @@ void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) { grpc_resource_quota_ref_internal(resource_quota); } +double grpc_resource_quota_get_memory_pressure( + grpc_resource_quota *resource_quota) { + return ((double)(gpr_atm_no_barrier_load( + &resource_quota->memory_usage_estimation))) / + ((double)MEMORY_USAGE_ESTIMATION_MAX); +} + /* Public API */ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, size_t size) { diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index d1127ce9ea..b9f62cbf83 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -84,6 +84,12 @@ void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx, grpc_resource_quota *grpc_resource_quota_from_channel_args( const grpc_channel_args *channel_args); +/* Return a number indicating current memory pressure: + 0.0 ==> no memory usage + 1.0 ==> maximum memory usage */ +double grpc_resource_quota_get_memory_pressure( + grpc_resource_quota *resource_quota); + typedef struct grpc_resource_user grpc_resource_user; grpc_resource_user *grpc_resource_user_create( diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 3b23b47d4f..02a194c982 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -76,8 +76,10 @@ struct grpc_udp_listener { grpc_udp_server *server; grpc_resolved_address addr; grpc_closure read_closure; + grpc_closure write_closure; grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; + grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; struct grpc_udp_listener *next; @@ -304,9 +306,33 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_unlock(&sp->server->mu); } +static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_udp_listener *sp = arg; + + gpr_mu_lock(&(sp->server->mu)); + if (error != GRPC_ERROR_NONE) { + if (0 == --sp->server->active_ports) { + gpr_mu_unlock(&sp->server->mu); + deactivated_all_ports(exec_ctx, sp->server); + } else { + gpr_mu_unlock(&sp->server->mu); + } + return; + } + + /* Tell the registered callback that the socket is writeable. */ + GPR_ASSERT(sp->write_cb); + sp->write_cb(exec_ctx, sp->emfd); + + /* Re-arm the notification event so we get another chance to write. */ + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + gpr_mu_unlock(&sp->server->mu); +} + static int add_socket_to_server(grpc_udp_server *s, int fd, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { grpc_udp_listener *sp; int port; @@ -333,6 +359,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, sp->emfd = grpc_fd_create(fd, name); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->read_cb = read_cb; + sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); @@ -345,6 +372,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, int grpc_udp_server_add_port(grpc_udp_server *s, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { grpc_udp_listener *sp; int allocated_port1 = -1; @@ -391,7 +419,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); + allocated_port1 = + add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -413,7 +442,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { addr = &addr4_copy; } - allocated_port2 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); + allocated_port2 = + add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -451,6 +481,10 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_closure_init(&sp->write_closure, on_write, sp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + s->active_ports++; sp = sp->next; } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index f3c466a031..ce068cbf04 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -49,6 +49,10 @@ typedef struct grpc_udp_server grpc_udp_server; typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, struct grpc_server *server); +/* Called when the socket is writeable. */ +typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, + grpc_fd *emfd); + /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd); @@ -75,6 +79,7 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index); int grpc_udp_server_add_port(grpc_udp_server *s, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, diff --git a/src/core/lib/support/log_posix.c b/src/core/lib/support/log_posix.c index f972da0887..79458dd7a3 100644 --- a/src/core/lib/support/log_posix.c +++ b/src/core/lib/support/log_posix.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <pthread.h> #include <stdarg.h> @@ -93,10 +94,13 @@ void gpr_default_log(gpr_log_func_args *args) { strcpy(time_buffer, "error:strftime"); } - fprintf(stderr, "%s%s.%09d %7tu %s:%d] %s\n", - gpr_log_severity_string(args->severity), time_buffer, - (int)(now.tv_nsec), gettid(), display_file, args->line, - args->message); + char *prefix; + gpr_asprintf(&prefix, "%s%s.%09d %7tu %s:%d]", + gpr_log_severity_string(args->severity), time_buffer, + (int)(now.tv_nsec), gettid(), display_file, args->line); + + fprintf(stderr, "%-70s %s\n", prefix, args->message); + gpr_free(prefix); } #endif /* defined(GPR_POSIX_LOG) */ diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 787e4d0dd2..b338ac4c48 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -63,6 +63,7 @@ #include "src/core/lib/surface/init.h" #include "src/core/lib/surface/lame_client.h" #include "src/core/lib/surface/server.h" +#include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/transport_impl.h" @@ -192,6 +193,7 @@ void grpc_init(void) { grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); grpc_register_tracer("combiner", &grpc_combiner_trace); grpc_register_tracer("server_channel", &grpc_server_channel_trace); + grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace); // Default pluck trace to 1 grpc_cq_pluck_trace = 1; grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace); diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c new file mode 100644 index 0000000000..e1483677fd --- /dev/null +++ b/src/core/lib/transport/bdp_estimator.c @@ -0,0 +1,104 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/transport/bdp_estimator.h" + +#include <stdlib.h> + +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +int grpc_bdp_estimator_trace = 0; + +void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) { + estimator->estimate = 65536; + estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; + estimator->name = name; +} + +bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, + int64_t *estimate) { + *estimate = estimator->estimate; + return true; +} + +bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, + int64_t num_bytes) { + estimator->accumulator += num_bytes; + switch (estimator->ping_state) { + case GRPC_BDP_PING_UNSCHEDULED: + return true; + case GRPC_BDP_PING_SCHEDULED: + return false; + case GRPC_BDP_PING_STARTED: + return false; + } + GPR_UNREACHABLE_CODE(return false); +} + +void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) { + if (grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, + estimator->name, estimator->accumulator, estimator->estimate); + } + GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED); + estimator->ping_state = GRPC_BDP_PING_SCHEDULED; + estimator->accumulator = 0; +} + +void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) { + if (grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, + estimator->name, estimator->accumulator, estimator->estimate); + } + GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED); + estimator->ping_state = GRPC_BDP_PING_STARTED; + estimator->accumulator = 0; +} + +void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) { + if (grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64, + estimator->name, estimator->accumulator, estimator->estimate); + } + GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED); + if (estimator->accumulator > 2 * estimator->estimate / 3) { + estimator->estimate *= 2; + if (grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, + estimator->name, estimator->estimate); + } + } + estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; + estimator->accumulator = 0; +} diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h new file mode 100644 index 0000000000..bcaf899910 --- /dev/null +++ b/src/core/lib/transport/bdp_estimator.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H +#define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H + +#include <stdbool.h> +#include <stdint.h> + +#define GRPC_BDP_SAMPLES 16 +#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3 + +extern int grpc_bdp_estimator_trace; + +typedef enum { + GRPC_BDP_PING_UNSCHEDULED, + GRPC_BDP_PING_SCHEDULED, + GRPC_BDP_PING_STARTED +} grpc_bdp_estimator_ping_state; + +typedef struct grpc_bdp_estimator { + grpc_bdp_estimator_ping_state ping_state; + int64_t accumulator; + int64_t estimate; + const char *name; +} grpc_bdp_estimator; + +void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name); + +// Returns true if a reasonable estimate could be obtained +bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, + int64_t *estimate); +// Returns true if the user should schedule a ping +bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, + int64_t num_bytes); +// Schedule a ping: call in response to receiving a true from +// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a +// transport (but not necessarily started) +void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator); +// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and once +// the ping is on the wire +void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator); +// Completes a previously started ping +void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator); + +#endif diff --git a/src/core/lib/transport/pid_controller.c b/src/core/lib/transport/pid_controller.c index 3cef225d4b..19cb1c0b36 100644 --- a/src/core/lib/transport/pid_controller.c +++ b/src/core/lib/transport/pid_controller.c @@ -32,26 +32,46 @@ */ #include "src/core/lib/transport/pid_controller.h" +#include <grpc/support/useful.h> void grpc_pid_controller_init(grpc_pid_controller *pid_controller, - double gain_p, double gain_i, double gain_d) { - pid_controller->gain_p = gain_p; - pid_controller->gain_i = gain_i; - pid_controller->gain_d = gain_d; + grpc_pid_controller_args args) { + pid_controller->args = args; + pid_controller->last_control_value = args.initial_control_value; grpc_pid_controller_reset(pid_controller); } void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) { pid_controller->last_error = 0.0; + pid_controller->last_dc_dt = 0.0; pid_controller->error_integral = 0.0; } double grpc_pid_controller_update(grpc_pid_controller *pid_controller, double error, double dt) { - pid_controller->error_integral += error * dt; + /* integrate error using the trapezoid rule */ + pid_controller->error_integral += + dt * (pid_controller->last_error + error) * 0.5; + pid_controller->error_integral = GPR_CLAMP( + pid_controller->error_integral, -pid_controller->args.integral_range, + pid_controller->args.integral_range); double diff_error = (error - pid_controller->last_error) / dt; + /* calculate derivative of control value vs time */ + double dc_dt = pid_controller->args.gain_p * error + + pid_controller->args.gain_i * pid_controller->error_integral + + pid_controller->args.gain_d * diff_error; + /* and perform trapezoidal integration */ + double new_control_value = pid_controller->last_control_value + + dt * (pid_controller->last_dc_dt + dc_dt) * 0.5; + new_control_value = + GPR_CLAMP(new_control_value, pid_controller->args.min_control_value, + pid_controller->args.max_control_value); pid_controller->last_error = error; - return dt * (pid_controller->gain_p * error + - pid_controller->gain_i * pid_controller->error_integral + - pid_controller->gain_d * diff_error); + pid_controller->last_dc_dt = dc_dt; + pid_controller->last_control_value = new_control_value; + return new_control_value; +} + +double grpc_pid_controller_last(grpc_pid_controller *pid_controller) { + return pid_controller->last_control_value; } diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h index 83c82d6471..0a86521e90 100644 --- a/src/core/lib/transport/pid_controller.h +++ b/src/core/lib/transport/pid_controller.h @@ -45,20 +45,33 @@ typedef struct { double gain_p; double gain_i; double gain_d; + double initial_control_value; + double min_control_value; + double max_control_value; + double integral_range; +} grpc_pid_controller_args; + +typedef struct { double last_error; double error_integral; + double last_control_value; + double last_dc_dt; + grpc_pid_controller_args args; } grpc_pid_controller; /** Initialize the controller */ void grpc_pid_controller_init(grpc_pid_controller *pid_controller, - double gain_p, double gain_i, double gain_d); + grpc_pid_controller_args args); /** Reset the controller: useful when things have changed significantly */ void grpc_pid_controller_reset(grpc_pid_controller *pid_controller); /** Update the controller: given a current error estimate, and the time since - the last update, returns a delta to the control value */ + the last update, returns a new control value */ double grpc_pid_controller_update(grpc_pid_controller *pid_controller, double error, double dt); +/** Returns the last control value calculated */ +double grpc_pid_controller_last(grpc_pid_controller *pid_controller); + #endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */ |