aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_client_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_client_posix.c')
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c73
1 files changed, 63 insertions, 10 deletions
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index e7ae1ef695..a3a70a8ed7 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -35,7 +35,7 @@
#ifdef GRPC_POSIX_SOCKET
-#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_client_posix.h"
#include <errno.h>
#include <netinet/in.h>
@@ -47,9 +47,11 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/socket_mutator.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/timer.h"
@@ -69,9 +71,11 @@ typedef struct {
char *addr_str;
grpc_endpoint **ep;
grpc_closure *closure;
+ grpc_channel_args *channel_args;
} async_connect;
-static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
+static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd,
+ const grpc_channel_args *channel_args) {
grpc_error *err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
@@ -86,6 +90,16 @@ static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (err != GRPC_ERROR_NONE) goto error;
+ if (channel_args) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) {
+ GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER);
+ grpc_socket_mutator *mutator = channel_args->args[i].value.pointer.p;
+ err = grpc_set_socket_with_mutator(fd, mutator);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+ }
+ }
goto done;
error:
@@ -114,10 +128,39 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
+ grpc_channel_args_destroy(ac->channel_args);
gpr_free(ac);
}
}
+grpc_endpoint *grpc_tcp_client_create_from_fd(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
+ const char *addr_str) {
+ size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
+ grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ 8 * 1024 * 1024};
+ tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer(
+ &channel_args->args[i], options);
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_internal_ref(
+ channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+
+ grpc_endpoint *ep =
+ grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str);
+ grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ return ep;
+}
+
static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
int so_error = 0;
@@ -165,7 +208,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
switch (so_error) {
case 0:
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
+ *ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args,
+ ac->addr_str);
fd = NULL;
break;
case ENOBUFS:
@@ -207,14 +251,18 @@ finish:
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (error != GRPC_ERROR_NONE) {
- error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
- "Failed to connect to remote host");
+ char *error_descr;
+ gpr_asprintf(&error_descr, "Failed to connect to remote host: %s",
+ grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION));
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, error_descr);
+ gpr_free(error_descr);
error =
grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str);
}
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
+ grpc_channel_args_destroy(ac->channel_args);
gpr_free(ac);
}
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
@@ -223,6 +271,7 @@ finish:
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
+ const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) {
int fd;
@@ -253,7 +302,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy));
addr = &addr4_copy;
}
- if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) {
+ if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
return;
}
@@ -270,7 +319,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
- *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str);
+ *ep =
+ grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str);
grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
goto done;
}
@@ -295,6 +345,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
ac->refs = 2;
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
+ ac->channel_args = grpc_channel_args_copy(channel_args);
if (grpc_tcp_trace) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
@@ -316,16 +367,18 @@ done:
// overridden by api_fuzzer.c
void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
- grpc_pollset_set *interested_parties, const grpc_resolved_address *addr,
+ grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
+ const grpc_resolved_address *addr,
gpr_timespec deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
+ const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) {
- grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr,
- deadline);
+ grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
+ channel_args, addr, deadline);
}
#endif