aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c4
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c6
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c7
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c4
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c2
-rw-r--r--src/core/lib/http/httpcli.c13
-rw-r--r--src/core/lib/iomgr/buffer_pool.c17
-rw-r--r--src/core/lib/iomgr/buffer_pool.h2
-rw-r--r--src/core/lib/iomgr/tcp_client.h4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c52
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.h45
11 files changed, 128 insertions, 28 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index ddc00bd79f..f8654804e4 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -149,8 +149,8 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
c->tcp = NULL;
grpc_closure_init(&c->connected, connected, c);
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp,
- args->interested_parties, args->addr, args->addr_len,
- args->deadline);
+ args->interested_parties, args->channel_args,
+ args->addr, args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
index b2c5e5b088..1e5b1c22e3 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
@@ -44,6 +44,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/tcp_client_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@@ -65,9 +66,8 @@ grpc_channel *grpc_insecure_channel_create_from_fd(
int flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
- grpc_endpoint *client =
- grpc_tcp_create(grpc_fd_create(fd, "client"),
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "fd-client");
+ grpc_endpoint *client = grpc_tcp_client_create_from_fd(
+ &exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client");
grpc_transport *transport =
grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index f36fbbfc57..2fbe03ad24 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -207,9 +207,10 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
GPR_ASSERT(c->connecting_endpoint == NULL);
gpr_mu_unlock(&c->mu);
grpc_closure_init(&c->connected_closure, connected, c);
- grpc_tcp_client_connect(
- exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint,
- args->interested_parties, args->addr, args->addr_len, args->deadline);
+ grpc_tcp_client_connect(exec_ctx, &c->connected_closure,
+ &c->newly_connecting_endpoint,
+ args->interested_parties, args->channel_args,
+ args->addr, args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index f0e07429fa..2c64878c0c 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -139,8 +139,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- err =
- grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp);
+ err = grpc_tcp_server_create(&exec_ctx, NULL,
+ grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index 4350543c27..020f67edd2 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -58,7 +58,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
gpr_asprintf(&name, "fd:%d", fd);
grpc_endpoint *server_endpoint = grpc_tcp_create(
- grpc_fd_create(fd, name), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
+ grpc_fd_create(fd, name), NULL, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
gpr_free(name);
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 593da734f2..2f6f7e37dc 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -226,10 +226,15 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
}
addr = &req->addresses->addrs[req->next_address++];
grpc_closure_init(&req->connected, on_connected, req);
- grpc_tcp_client_connect(exec_ctx, &req->connected, &req->ep,
- req->context->pollset_set, req->buffer_pool,
- (struct sockaddr *)&addr->addr, addr->len,
- req->deadline);
+ grpc_arg arg;
+ arg.key = GRPC_ARG_BUFFER_POOL;
+ arg.type = GRPC_ARG_POINTER;
+ arg.value.pointer.p = req->buffer_pool;
+ arg.value.pointer.vtable = grpc_buffer_pool_arg_vtable();
+ grpc_channel_args args = {1, &arg};
+ grpc_tcp_client_connect(
+ exec_ctx, &req->connected, &req->ep, req->context->pollset_set, &args,
+ (struct sockaddr *)&addr->addr, addr->len, req->deadline);
}
static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c
index c7e1fcbf4a..0153bce203 100644
--- a/src/core/lib/iomgr/buffer_pool.c
+++ b/src/core/lib/iomgr/buffer_pool.c
@@ -33,6 +33,8 @@
#include "src/core/lib/iomgr/buffer_pool.h"
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -364,6 +366,21 @@ void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t size) {
grpc_exec_ctx_finish(&exec_ctx);
}
+grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
+ grpc_channel_args *channel_args) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
+ if (channel_args->args[i].type == GRPC_ARG_POINTER) {
+ return grpc_buffer_pool_internal_ref(
+ channel_args->args[i].value.pointer.p);
+ } else {
+ gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer");
+ }
+ }
+ }
+ return grpc_buffer_pool_create();
+}
+
/*******************************************************************************
* grpc_buffer_user api
*/
diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h
index 2774a445e3..2ffc5b6b75 100644
--- a/src/core/lib/iomgr/buffer_pool.h
+++ b/src/core/lib/iomgr/buffer_pool.h
@@ -41,6 +41,8 @@
grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool);
void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx,
grpc_buffer_pool *buffer_pool);
+grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
+ grpc_channel_args *channel_args);
typedef enum {
GRPC_BULIST_AWAITING_ALLOCATION,
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index 04e4108b35..b854e5aadc 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -39,6 +39,8 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/sockaddr.h"
+#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size"
+
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure).
@@ -47,7 +49,7 @@
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect,
grpc_endpoint **endpoint,
grpc_pollset_set *interested_parties,
- grpc_buffer_pool *buffer_pool,
+ const grpc_channel_args *channel_args,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline);
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 42ceb33933..860a4f8436 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -35,7 +35,7 @@
#ifdef GPR_POSIX_SOCKET
-#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_client_posix.h"
#include <errno.h>
#include <netinet/in.h>
@@ -47,6 +47,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -69,7 +70,7 @@ typedef struct {
char *addr_str;
grpc_endpoint **ep;
grpc_closure *closure;
- grpc_buffer_pool *buffer_pool;
+ grpc_channel_args *channel_args;
} async_connect;
static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) {
@@ -115,11 +116,38 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
- grpc_buffer_pool_internal_unref(exec_ctx, ac->buffer_pool);
+ grpc_channel_args_destroy(ac->channel_args);
gpr_free(ac);
}
}
+grpc_endpoint *grpc_tcp_client_create_from_fd(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
+ const char *addr_str) {
+ size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ 8 * 1024 * 1024};
+ tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer(
+ &channel_args->args[i], options);
+ } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
+ grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
+ buffer_pool = grpc_buffer_pool_internal_ref(
+ channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+
+ grpc_endpoint *ep =
+ grpc_tcp_create(fd, buffer_pool, tcp_read_chunk_size, addr_str);
+ grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
+ return ep;
+}
+
static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
int so_error = 0;
@@ -192,8 +220,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
}
} else {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- *ep = grpc_tcp_create(fd, ac->buffer_pool,
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
+ *ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args,
+ ac->addr_str);
fd = NULL;
goto finish;
}
@@ -230,7 +258,7 @@ finish:
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
- grpc_buffer_pool *buffer_pool,
+ const grpc_channel_args *channel_args,
const struct sockaddr *addr,
size_t addr_len, gpr_timespec deadline) {
int fd;
@@ -279,8 +307,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
- *ep = grpc_tcp_create(fdobj, buffer_pool, GRPC_TCP_DEFAULT_READ_SLICE_SIZE,
- addr_str);
+ *ep =
+ grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str);
grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
goto done;
}
@@ -305,7 +333,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
ac->refs = 2;
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
- ac->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
+ ac->channel_args = grpc_channel_args_copy(channel_args);
if (grpc_tcp_trace) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
@@ -327,18 +355,18 @@ done:
// overridden by api_fuzzer.c
void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
- grpc_pollset_set *interested_parties, grpc_buffer_pool *buffer_pool,
+ grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
- grpc_buffer_pool *buffer_pool,
+ const grpc_channel_args *channel_args,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) {
grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
- buffer_pool, addr, addr_len, deadline);
+ channel_args, addr, addr_len, deadline);
}
#endif
diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h
new file mode 100644
index 0000000000..d8108b8359
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_posix.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H
+
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+
+grpc_endpoint *grpc_tcp_client_create_from_fd(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
+ const char *addr_str);
+
+#endif