aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.c')
-rw-r--r--src/core/lib/iomgr/tcp_posix.c114
1 files changed, 99 insertions, 15 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 4d7cf3ff51..5f4b38de2b 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -52,7 +52,9 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/profiling/timers.h"
@@ -80,10 +82,14 @@ typedef struct {
int fd;
bool finished_edge;
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
- size_t slice_size;
+ double target_length;
+ double bytes_read_this_round;
gpr_refcount refcount;
gpr_atm shutdown_count;
+ int min_read_chunk_size;
+ int max_read_chunk_size;
+
/* garbage after the last read */
grpc_slice_buffer last_read_buffer;
@@ -108,6 +114,42 @@ typedef struct {
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
+static void add_to_estimate(grpc_tcp *tcp, size_t bytes) {
+ tcp->bytes_read_this_round += (double)bytes;
+}
+
+static void finish_estimate(grpc_tcp *tcp) {
+ /* If we read >80% of the target buffer in one read loop, increase the size
+ of the target buffer to either the amount read, or twice its previous
+ value */
+ if (tcp->bytes_read_this_round > tcp->target_length * 0.8) {
+ tcp->target_length =
+ GPR_MAX(2 * tcp->target_length, tcp->bytes_read_this_round);
+ } else {
+ tcp->target_length =
+ 0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round;
+ }
+ tcp->bytes_read_this_round = 0;
+}
+
+static size_t get_target_read_size(grpc_tcp *tcp) {
+ grpc_resource_quota *rq = grpc_resource_user_quota(tcp->resource_user);
+ double pressure = grpc_resource_quota_get_memory_pressure(rq);
+ double target =
+ tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0);
+ size_t sz = (((size_t)GPR_CLAMP(target, tcp->min_read_chunk_size,
+ tcp->max_read_chunk_size)) +
+ 255) &
+ ~(size_t)255;
+ /* don't use more than 1/16th of the overall resource quota for a single read
+ * alloc */
+ size_t rqmax = grpc_resource_quota_peek_size(rq);
+ if (sz > rqmax / 16 && rqmax > 1024) {
+ sz = rqmax / 16;
+ }
+ return sz;
+}
+
static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) {
return grpc_error_set_str(
grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
@@ -232,9 +274,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
- if (tcp->iov_size > 1) {
- tcp->iov_size /= 2;
- }
+ finish_estimate(tcp);
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
@@ -253,14 +293,13 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
+ add_to_estimate(tcp, (size_t)read_bytes);
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
grpc_slice_buffer_trim_end(
tcp->incoming_buffer,
tcp->incoming_buffer->length - (size_t)read_bytes,
&tcp->last_read_buffer);
- } else if (tcp->iov_size < MAX_READ_IOVEC) {
- ++tcp->iov_size;
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
@@ -285,11 +324,11 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
}
static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
- if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
- grpc_resource_user_alloc_slices(
- exec_ctx, &tcp->slice_allocator, tcp->slice_size,
- (size_t)tcp->iov_size - tcp->incoming_buffer->count,
- tcp->incoming_buffer);
+ size_t target_read_size = get_target_read_size(tcp);
+ if (tcp->incoming_buffer->length < target_read_size &&
+ tcp->incoming_buffer->count < MAX_READ_IOVEC) {
+ grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator,
+ target_read_size, 1, tcp->incoming_buffer);
} else {
tcp_do_read(exec_ctx, tcp);
}
@@ -540,9 +579,50 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_get_peer,
tcp_get_fd};
-grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
- grpc_resource_quota *resource_quota,
- size_t slice_size, const char *peer_string) {
+#define MAX_CHUNK_SIZE 32 * 1024 * 1024
+
+grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
+ const grpc_channel_args *channel_args,
+ const char *peer_string) {
+ int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
+ int tcp_max_read_chunk_size = 4 * 1024 * 1024;
+ int tcp_min_read_chunk_size = 256;
+ grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ MAX_CHUNK_SIZE};
+ tcp_read_chunk_size =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ MAX_CHUNK_SIZE};
+ tcp_min_read_chunk_size =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ MAX_CHUNK_SIZE};
+ tcp_max_read_chunk_size =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
+ channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+
+ if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) {
+ tcp_min_read_chunk_size = tcp_max_read_chunk_size;
+ }
+ tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size,
+ tcp_max_read_chunk_size);
+
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->peer_string = gpr_strdup(peer_string);
@@ -552,7 +632,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
tcp->release_fd_cb = NULL;
tcp->release_fd = NULL;
tcp->incoming_buffer = NULL;
- tcp->slice_size = slice_size;
+ tcp->target_length = (double)tcp_read_chunk_size;
+ tcp->min_read_chunk_size = tcp_min_read_chunk_size;
+ tcp->max_read_chunk_size = tcp_max_read_chunk_size;
+ tcp->bytes_read_this_round = 0;
tcp->iov_size = 1;
tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy */
@@ -569,6 +652,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
&tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
return &tcp->base;
}