diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_client_posix.c')
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 91 |
1 files changed, 66 insertions, 25 deletions
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 9089751d94..169ddcabed 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -31,11 +31,11 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET -#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_client_posix.h" #include <errno.h> #include <netinet/in.h> @@ -47,6 +47,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_posix.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -70,9 +71,10 @@ typedef struct { char *addr_str; grpc_endpoint **ep; grpc_closure *closure; + grpc_channel_args *channel_args; } async_connect; -static grpc_error *prepare_socket(const struct sockaddr *addr, int fd, +static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd, const grpc_channel_args *channel_args) { grpc_error *err = GRPC_ERROR_NONE; @@ -126,10 +128,39 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); + grpc_channel_args_destroy(ac->channel_args); gpr_free(ac); } } +grpc_endpoint *grpc_tcp_client_create_from_fd( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, + const char *addr_str) { + size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + 8 * 1024 * 1024}; + tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( + &channel_args->args[i], options); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_internal_ref( + channel_args->args[i].value.pointer.p); + } + } + } + + grpc_endpoint *ep = + grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str); + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); + return ep; +} + static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect *ac = acp; int so_error = 0; @@ -177,7 +208,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { switch (so_error) { case 0: grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); - *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + *ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args, + ac->addr_str); fd = NULL; break; case ENOBUFS: @@ -227,6 +259,7 @@ finish: if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); + grpc_channel_args_destroy(ac->channel_args); gpr_free(ac); } grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); @@ -234,26 +267,26 @@ finish: static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - const grpc_tcp_client_connect_args *args) { + grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, + const grpc_resolved_address *addr, + gpr_timespec deadline) { int fd; grpc_dualstack_mode dsmode; int err; async_connect *ac; - struct sockaddr_in6 addr6_v4mapped; - struct sockaddr_in addr4_copy; + grpc_resolved_address addr6_v4mapped; + grpc_resolved_address addr4_copy; grpc_fd *fdobj; char *name; char *addr_str; grpc_error *error; - const struct sockaddr *addr = args->addr; - size_t addr_len = args->addr_len; *ep = NULL; /* Use dualstack sockets where available. */ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = (const struct sockaddr *)&addr6_v4mapped; - addr_len = sizeof(addr6_v4mapped); + addr = &addr6_v4mapped; } error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); @@ -264,18 +297,18 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (dsmode == GRPC_DSMODE_IPV4) { /* If we got an AF_INET socket, map the address back to IPv4. */ GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy)); - addr = (struct sockaddr *)&addr4_copy; - addr_len = sizeof(addr4_copy); + addr = &addr4_copy; } - if ((error = prepare_socket(addr, fd, args->channel_args)) != + if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) { grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } do { - GPR_ASSERT(addr_len < ~(socklen_t)0); - err = connect(fd, addr, (socklen_t)addr_len); + GPR_ASSERT(addr->len < ~(socklen_t)0); + err = + connect(fd, (const struct sockaddr *)addr->addr, (socklen_t)addr->len); } while (err < 0 && errno == EINTR); addr_str = grpc_sockaddr_to_uri(addr); @@ -284,7 +317,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, fdobj = grpc_fd_create(fd, name); if (err >= 0) { - *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); + *ep = + grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str); grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } @@ -296,19 +330,20 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, goto done; } - grpc_pollset_set_add_fd(exec_ctx, args->interested_parties, fdobj); + grpc_pollset_set_add_fd(exec_ctx, interested_parties, fdobj); ac = gpr_malloc(sizeof(async_connect)); ac->closure = closure; ac->ep = ep; ac->fd = fdobj; - ac->interested_parties = args->interested_parties; + ac->interested_parties = interested_parties; ac->addr_str = addr_str; addr_str = NULL; gpr_mu_init(&ac->mu); ac->refs = 2; ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; + ac->channel_args = grpc_channel_args_copy(channel_args); if (grpc_tcp_trace) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", @@ -317,7 +352,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&ac->mu); grpc_timer_init(exec_ctx, &ac->alarm, - gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC), + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); @@ -329,13 +364,19 @@ done: // overridden by api_fuzzer.c void (*grpc_tcp_client_connect_impl)( - grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_endpoint **endpoint, - const grpc_tcp_client_connect_args *args) = tcp_client_connect_impl; + grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, + const grpc_resolved_address *addr, + gpr_timespec deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - const grpc_tcp_client_connect_args *args) { - grpc_tcp_client_connect_impl(exec_ctx, closure, ep, args); + grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, + const grpc_resolved_address *addr, + gpr_timespec deadline) { + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, + channel_args, addr, deadline); } #endif |