diff options
author | Craig Tiller <ctiller@google.com> | 2017-04-06 18:22:39 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-06 18:22:39 -0700 |
commit | 71f1b8ebf708e8d83b6b007aba558612660ef35c (patch) | |
tree | 8fad6dc487eefef53ca2a110aa4c1c80e250a075 /src/core | |
parent | fbed2848696f7c98ac58232d4942402fd4efebdb (diff) | |
parent | fc8d671ec8b2ad977d2df6c712e2a08ab13e16ca (diff) |
Merge pull request #10231 from ctiller/dynamic_tcp_sizing
Dynamically adjust TCP read buffer size
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair.h | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair_posix.c | 17 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair_uv.c | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair_windows.c | 15 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.c | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 24 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_windows.c | 21 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 114 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.h | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.c | 22 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_utils_posix.h | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_windows.c | 25 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.c | 14 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.h | 4 |
17 files changed, 170 insertions, 128 deletions
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c index f46e849932..6ab176e8ad 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -57,12 +57,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, char *name; gpr_asprintf(&name, "fd:%d", fd); - grpc_resource_quota *resource_quota = grpc_resource_quota_from_channel_args( - grpc_server_get_channel_args(server)); grpc_endpoint *server_endpoint = - grpc_tcp_create(grpc_fd_create(fd, name), resource_quota, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + grpc_tcp_create(&exec_ctx, grpc_fd_create(fd, name), + grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index f9de0c715e..6407a6ad3f 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -41,8 +41,7 @@ typedef struct { grpc_endpoint *server; } grpc_endpoint_pair; -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size); +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args); #endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c index b9ff969e81..5542a372d8 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.c @@ -62,22 +62,25 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE); } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), resource_quota, - read_slice_size, "socketpair-server"); + p.client = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], final_name), args, + "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), resource_quota, - read_slice_size, "socketpair-client"); + p.server = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[0], final_name), args, + "socketpair-client"); gpr_free(final_name); + + grpc_exec_ctx_finish(&exec_ctx); return p; } diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.c index ff24894c6d..9718eb0523 100644 --- a/src/core/lib/iomgr/endpoint_pair_uv.c +++ b/src/core/lib/iomgr/endpoint_pair_uv.c @@ -41,9 +41,8 @@ #include "src/core/lib/iomgr/endpoint_pair.h" -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args) { grpc_endpoint_pair endpoint_pair; // TODO(mlumish): implement this properly under libuv GPR_ASSERT(false && diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.c index 93f71b745c..25d6264dfb 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.c +++ b/src/core/lib/iomgr/endpoint_pair_windows.c @@ -83,15 +83,18 @@ static void create_sockets(SOCKET sv[2]) { } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { + const char *name, grpc_channel_args *channel_args) { SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), - resource_quota, "endpoint:server"); - p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), - resource_quota, "endpoint:client"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + p.client = grpc_tcp_create(&exec_ctx, + grpc_winsocket_create(sv[1], "endpoint:client"), + channel_args, "endpoint:server"); + p.server = grpc_tcp_create(&exec_ctx, + grpc_winsocket_create(sv[0], "endpoint:server"), + channel_args, "endpoint:client"); + grpc_exec_ctx_finish(&exec_ctx); return p; } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 8dcd80d001..c3ee878651 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -142,6 +142,8 @@ struct grpc_resource_quota { /* Amount of free memory in the resource quota */ int64_t free_pool; + gpr_atm last_size; + /* Has rq_step been scheduled to occur? */ bool step_scheduled; /* Are we currently reclaiming memory */ @@ -581,6 +583,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { resource_quota->combiner = grpc_combiner_create(NULL); resource_quota->free_pool = INT64_MAX; resource_quota->size = INT64_MAX; + gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); resource_quota->step_scheduled = false; resource_quota->reclaiming = false; gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0); @@ -643,11 +646,17 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, rq_resize_args *a = gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_ref_internal(resource_quota); a->size = (int64_t)size; + gpr_atm_no_barrier_store(&resource_quota->last_size, + (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size)); grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } +size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota) { + return (size_t)gpr_atm_no_barrier_load(&resource_quota->last_size); +} + /******************************************************************************* * grpc_resource_user channel args api */ diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index b9f62cbf83..6f99be0d51 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -90,6 +90,8 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args( double grpc_resource_quota_get_memory_pressure( grpc_resource_quota *resource_quota); +size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota); + typedef struct grpc_resource_user grpc_resource_user; grpc_resource_user *grpc_resource_user_create( diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 0485661316..bc367bdfa5 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -40,10 +40,6 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resolve_address.h" -/* Channel arg (integer) setting how large a slice to try and read from the wire - each time recvmsg (or equivalent) is called */ -#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size" - /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure). diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index a108b10da6..a2692707d9 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -137,29 +137,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { grpc_endpoint *grpc_tcp_client_create_from_fd( grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, const char *addr_str) { - size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; - grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); - if (channel_args != NULL) { - for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { - grpc_integer_options options = {(int)tcp_read_chunk_size, 1, - 8 * 1024 * 1024}; - tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( - &channel_args->args[i], options); - } else if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); - } - } - } - - grpc_endpoint *ep = - grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str); - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - return ep; + return grpc_tcp_create(exec_ctx, fd, channel_args, addr_str); } static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index a356564766..d6baca50ba 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -63,7 +63,7 @@ typedef struct { int refs; grpc_closure on_connect; grpc_endpoint **endpoint; - grpc_resource_quota *resource_quota; + grpc_channel_args *channel_args; } async_connect; static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, @@ -72,7 +72,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, int done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { - grpc_resource_quota_unref_internal(exec_ctx, ac->resource_quota); + grpc_channel_args_destroy(exec_ctx, ac->channel_args); gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_name); gpr_free(ac); @@ -119,7 +119,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (!wsa_success) { error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); } else { - *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); + *ep = + grpc_tcp_create(exec_ctx, socket, ac->channel_args, ac->addr_name); socket = NULL; } } else { @@ -152,17 +153,6 @@ static void tcp_client_connect_impl( grpc_winsocket_callback_info *info; grpc_error *error = GRPC_ERROR_NONE; - grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); - if (channel_args != NULL) { - for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); - } - } - } - *endpoint = NULL; /* Use dualstack sockets where available. */ @@ -225,7 +215,7 @@ static void tcp_client_connect_impl( ac->refs = 2; ac->addr_name = grpc_sockaddr_to_uri(addr); ac->endpoint = endpoint; - ac->resource_quota = resource_quota; + ac->channel_args = grpc_channel_args_copy(channel_args); grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); @@ -247,7 +237,6 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); grpc_closure_sched(exec_ctx, on_done, final_error); } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 4d7cf3ff51..5f4b38de2b 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -52,7 +52,9 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/profiling/timers.h" @@ -80,10 +82,14 @@ typedef struct { int fd; bool finished_edge; msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ - size_t slice_size; + double target_length; + double bytes_read_this_round; gpr_refcount refcount; gpr_atm shutdown_count; + int min_read_chunk_size; + int max_read_chunk_size; + /* garbage after the last read */ grpc_slice_buffer last_read_buffer; @@ -108,6 +114,42 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { + tcp->bytes_read_this_round += (double)bytes; +} + +static void finish_estimate(grpc_tcp *tcp) { + /* If we read >80% of the target buffer in one read loop, increase the size + of the target buffer to either the amount read, or twice its previous + value */ + if (tcp->bytes_read_this_round > tcp->target_length * 0.8) { + tcp->target_length = + GPR_MAX(2 * tcp->target_length, tcp->bytes_read_this_round); + } else { + tcp->target_length = + 0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round; + } + tcp->bytes_read_this_round = 0; +} + +static size_t get_target_read_size(grpc_tcp *tcp) { + grpc_resource_quota *rq = grpc_resource_user_quota(tcp->resource_user); + double pressure = grpc_resource_quota_get_memory_pressure(rq); + double target = + tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0); + size_t sz = (((size_t)GPR_CLAMP(target, tcp->min_read_chunk_size, + tcp->max_read_chunk_size)) + + 255) & + ~(size_t)255; + /* don't use more than 1/16th of the overall resource quota for a single read + * alloc */ + size_t rqmax = grpc_resource_quota_peek_size(rq); + if (sz > rqmax / 16 && rqmax > 1024) { + sz = rqmax / 16; + } + return sz; +} + static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), @@ -232,9 +274,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ if (errno == EAGAIN) { - if (tcp->iov_size > 1) { - tcp->iov_size /= 2; - } + finish_estimate(tcp); /* We've consumed the edge, request a new one */ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { @@ -253,14 +293,13 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { + add_to_estimate(tcp, (size_t)read_bytes); GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { grpc_slice_buffer_trim_end( tcp->incoming_buffer, tcp->incoming_buffer->length - (size_t)read_bytes, &tcp->last_read_buffer); - } else if (tcp->iov_size < MAX_READ_IOVEC) { - ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE); @@ -285,11 +324,11 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, } static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { - if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { - grpc_resource_user_alloc_slices( - exec_ctx, &tcp->slice_allocator, tcp->slice_size, - (size_t)tcp->iov_size - tcp->incoming_buffer->count, - tcp->incoming_buffer); + size_t target_read_size = get_target_read_size(tcp); + if (tcp->incoming_buffer->length < target_read_size && + tcp->incoming_buffer->count < MAX_READ_IOVEC) { + grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, + target_read_size, 1, tcp->incoming_buffer); } else { tcp_do_read(exec_ctx, tcp); } @@ -540,9 +579,50 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_get_peer, tcp_get_fd}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, - grpc_resource_quota *resource_quota, - size_t slice_size, const char *peer_string) { +#define MAX_CHUNK_SIZE 32 * 1024 * 1024 + +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, + const grpc_channel_args *channel_args, + const char *peer_string) { + int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; + int tcp_max_read_chunk_size = 4 * 1024 * 1024; + int tcp_min_read_chunk_size = 256; + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_min_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_max_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + channel_args->args[i].value.pointer.p); + } + } + } + + if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) { + tcp_min_read_chunk_size = tcp_max_read_chunk_size; + } + tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size, + tcp_max_read_chunk_size); + grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->peer_string = gpr_strdup(peer_string); @@ -552,7 +632,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp->release_fd_cb = NULL; tcp->release_fd = NULL; tcp->incoming_buffer = NULL; - tcp->slice_size = slice_size; + tcp->target_length = (double)tcp_read_chunk_size; + tcp->min_read_chunk_size = tcp_min_read_chunk_size; + tcp->max_read_chunk_size = tcp_max_read_chunk_size; + tcp->bytes_read_this_round = 0; tcp->iov_size = 1; tcp->finished_edge = true; /* paired with unref in grpc_tcp_destroy */ @@ -569,6 +652,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); return &tcp->base; } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index 1c0d13f96e..1ad5788331 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -47,14 +47,13 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" -#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 - extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_resource_quota *resource_quota, - size_t read_slice_size, const char *peer_string); +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + const grpc_channel_args *args, + const char *peer_string); /* Return the tcp endpoint's fd, or -1 if this is not available. Does not release the fd. diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index d6a017cf7f..e66ffc9b1c 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -59,6 +59,7 @@ #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -90,7 +91,6 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; - s->resource_quota = grpc_resource_quota_create(NULL); s->expand_wildcard_addrs = false; for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { @@ -98,27 +98,14 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, s->so_reuseport = has_so_reuseport && (args->args[i].value.integer != 0); } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT " must be an integer"); } - } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { - if (args->args[i].type == GRPC_ARG_POINTER) { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - s->resource_quota = - grpc_resource_quota_ref_internal(args->args[i].value.pointer.p); - } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - gpr_free(s); - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); - } } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { s->expand_wildcard_addrs = (args->args[i].value.integer != 0); } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer"); @@ -138,6 +125,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, s->head = NULL; s->tail = NULL; s->nports = 0; + s->channel_args = grpc_channel_args_copy(args); gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0); *server = s; return GRPC_ERROR_NONE; @@ -158,8 +146,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { s->head = sp->next; gpr_free(sp); } - - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); + grpc_channel_args_destroy(exec_ctx, s->channel_args); gpr_free(s); } @@ -286,8 +273,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, sp->server->resource_quota, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), + grpc_tcp_create(exec_ctx, fdobj, sp->server->channel_args, addr_str), read_notifier_pollset, acceptor); gpr_free(name); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index f5dc8532f9..c15e2e1493 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -103,7 +103,8 @@ struct grpc_tcp_server { /* next pollset to assign a channel to */ gpr_atm next_pollset_to_assign; - grpc_resource_quota *resource_quota; + /* channel args for this server */ + grpc_channel_args *channel_args; }; /* If successful, add a listener to \a s for \a addr, set \a dsmode for the diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 12ce7d3fdd..4c17f08918 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -46,6 +46,7 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/pollset_windows.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -102,7 +103,7 @@ struct grpc_tcp_server { /* shutdown callback */ grpc_closure *shutdown_complete; - grpc_resource_quota *resource_quota; + grpc_channel_args *channel_args; }; /* Public function. Allocates the proper data structures to hold a @@ -112,21 +113,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, const grpc_channel_args *args, grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); - s->resource_quota = grpc_resource_quota_create(NULL); - for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { - if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { - if (args->args[i].type == GRPC_ARG_POINTER) { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - s->resource_quota = - grpc_resource_quota_ref_internal(args->args[i].value.pointer.p); - } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - gpr_free(s); - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); - } - } - } + s->channel_args = grpc_channel_args_copy(args); gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); s->active_ports = 0; @@ -155,7 +142,7 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_winsocket_destroy(sp->socket); gpr_free(sp); } - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); + grpc_channel_args_destroy(exec_ctx, s->channel_args); gpr_free(s); } @@ -383,8 +370,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_free(utf8_message); } gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); - ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), - sp->server->resource_quota, peer_name_string); + ep = grpc_tcp_create(exec_ctx, grpc_winsocket_create(sock, fd_name), + sp->server->channel_args, peer_name_string); gpr_free(fd_name); gpr_free(peer_name_string); } else { diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 9134883226..f74aa68793 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -430,9 +430,19 @@ static grpc_endpoint_vtable vtable = {win_read, win_get_peer, win_get_fd}; -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, - grpc_resource_quota *resource_quota, +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, + grpc_channel_args *channel_args, char *peer_string) { + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + channel_args->args[i].value.pointer.p); + } + } + } grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h index 4402de1c38..abafdb22d2 100644 --- a/src/core/lib/iomgr/tcp_windows.h +++ b/src/core/lib/iomgr/tcp_windows.h @@ -50,8 +50,8 @@ /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, - grpc_resource_quota *resource_quota, +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, + grpc_channel_args *channel_args, char *peer_string); grpc_error *grpc_tcp_prepare_socket(SOCKET sock); |