diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_client_posix.c')
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 52 |
1 files changed, 40 insertions, 12 deletions
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 42ceb33933..860a4f8436 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -35,7 +35,7 @@ #ifdef GPR_POSIX_SOCKET -#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_client_posix.h" #include <errno.h> #include <netinet/in.h> @@ -47,6 +47,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_posix.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -69,7 +70,7 @@ typedef struct { char *addr_str; grpc_endpoint **ep; grpc_closure *closure; - grpc_buffer_pool *buffer_pool; + grpc_channel_args *channel_args; } async_connect; static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { @@ -115,11 +116,38 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); - grpc_buffer_pool_internal_unref(exec_ctx, ac->buffer_pool); + grpc_channel_args_destroy(ac->channel_args); gpr_free(ac); } } +grpc_endpoint *grpc_tcp_client_create_from_fd( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, + const char *addr_str) { + size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + 8 * 1024 * 1024}; + tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( + &channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + buffer_pool = grpc_buffer_pool_internal_ref( + channel_args->args[i].value.pointer.p); + } + } + } + + grpc_endpoint *ep = + grpc_tcp_create(fd, buffer_pool, tcp_read_chunk_size, addr_str); + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + return ep; +} + static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect *ac = acp; int so_error = 0; @@ -192,8 +220,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { } } else { grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); - *ep = grpc_tcp_create(fd, ac->buffer_pool, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + *ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args, + ac->addr_str); fd = NULL; goto finish; } @@ -230,7 +258,7 @@ finish: static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, - grpc_buffer_pool *buffer_pool, + const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { int fd; @@ -279,8 +307,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, fdobj = grpc_fd_create(fd, name); if (err >= 0) { - *ep = grpc_tcp_create(fdobj, buffer_pool, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, - addr_str); + *ep = + grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str); grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } @@ -305,7 +333,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, ac->refs = 2; ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; - ac->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); + ac->channel_args = grpc_channel_args_copy(channel_args); if (grpc_tcp_trace) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", @@ -327,18 +355,18 @@ done: // overridden by api_fuzzer.c void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - grpc_pollset_set *interested_parties, grpc_buffer_pool *buffer_pool, + grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, - grpc_buffer_pool *buffer_pool, + const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, - buffer_pool, addr, addr_len, deadline); + channel_args, addr, addr_len, deadline); } #endif |