diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/transport/chttp2/client/insecure/channel_create.c | 4 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c | 6 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/client/secure/secure_channel_create.c | 7 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/server/insecure/server_chttp2.c | 4 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/http/httpcli.c | 13 | ||||
-rw-r--r-- | src/core/lib/iomgr/buffer_pool.c | 17 | ||||
-rw-r--r-- | src/core/lib/iomgr/buffer_pool.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 52 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.h | 45 |
11 files changed, 128 insertions, 28 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index ddc00bd79f..f8654804e4 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -149,8 +149,8 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, c->tcp = NULL; grpc_closure_init(&c->connected, connected, c); grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp, - args->interested_parties, args->addr, args->addr_len, - args->deadline); + args->interested_parties, args->channel_args, + args->addr, args->addr_len, args->deadline); } static const grpc_connector_vtable connector_vtable = { diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c index b2c5e5b088..1e5b1c22e3 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c @@ -44,6 +44,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_client_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" @@ -65,9 +66,8 @@ grpc_channel *grpc_insecure_channel_create_from_fd( int flags = fcntl(fd, F_GETFL, 0); GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); - grpc_endpoint *client = - grpc_tcp_create(grpc_fd_create(fd, "client"), - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "fd-client"); + grpc_endpoint *client = grpc_tcp_client_create_from_fd( + &exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client"); grpc_transport *transport = grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index f36fbbfc57..2fbe03ad24 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -207,9 +207,10 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, GPR_ASSERT(c->connecting_endpoint == NULL); gpr_mu_unlock(&c->mu); grpc_closure_init(&c->connected_closure, connected, c); - grpc_tcp_client_connect( - exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint, - args->interested_parties, args->addr, args->addr_len, args->deadline); + grpc_tcp_client_connect(exec_ctx, &c->connected_closure, + &c->newly_connecting_endpoint, + args->interested_parties, args->channel_args, + args->addr, args->addr_len, args->deadline); } static const grpc_connector_vtable connector_vtable = { diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index f0e07429fa..2c64878c0c 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -139,8 +139,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { goto error; } - err = - grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp); + err = grpc_tcp_server_create(&exec_ctx, NULL, + grpc_server_get_channel_args(server), &tcp); if (err != GRPC_ERROR_NONE) { goto error; } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c index 4350543c27..020f67edd2 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -58,7 +58,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, gpr_asprintf(&name, "fd:%d", fd); grpc_endpoint *server_endpoint = grpc_tcp_create( - grpc_fd_create(fd, name), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); + grpc_fd_create(fd, name), NULL, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); gpr_free(name); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 593da734f2..2f6f7e37dc 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -226,10 +226,15 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, } addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); - grpc_tcp_client_connect(exec_ctx, &req->connected, &req->ep, - req->context->pollset_set, req->buffer_pool, - (struct sockaddr *)&addr->addr, addr->len, - req->deadline); + grpc_arg arg; + arg.key = GRPC_ARG_BUFFER_POOL; + arg.type = GRPC_ARG_POINTER; + arg.value.pointer.p = req->buffer_pool; + arg.value.pointer.vtable = grpc_buffer_pool_arg_vtable(); + grpc_channel_args args = {1, &arg}; + grpc_tcp_client_connect( + exec_ctx, &req->connected, &req->ep, req->context->pollset_set, &args, + (struct sockaddr *)&addr->addr, addr->len, req->deadline); } static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c index c7e1fcbf4a..0153bce203 100644 --- a/src/core/lib/iomgr/buffer_pool.c +++ b/src/core/lib/iomgr/buffer_pool.c @@ -33,6 +33,8 @@ #include "src/core/lib/iomgr/buffer_pool.h" +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -364,6 +366,21 @@ void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t size) { grpc_exec_ctx_finish(&exec_ctx); } +grpc_buffer_pool *grpc_buffer_pool_from_channel_args( + grpc_channel_args *channel_args) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { + if (channel_args->args[i].type == GRPC_ARG_POINTER) { + return grpc_buffer_pool_internal_ref( + channel_args->args[i].value.pointer.p); + } else { + gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer"); + } + } + } + return grpc_buffer_pool_create(); +} + /******************************************************************************* * grpc_buffer_user api */ diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h index 2774a445e3..2ffc5b6b75 100644 --- a/src/core/lib/iomgr/buffer_pool.h +++ b/src/core/lib/iomgr/buffer_pool.h @@ -41,6 +41,8 @@ grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool); void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool); +grpc_buffer_pool *grpc_buffer_pool_from_channel_args( + grpc_channel_args *channel_args); typedef enum { GRPC_BULIST_AWAITING_ALLOCATION, diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 04e4108b35..b854e5aadc 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -39,6 +39,8 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/sockaddr.h" +#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size" + /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure). @@ -47,7 +49,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, - grpc_buffer_pool *buffer_pool, + const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 42ceb33933..860a4f8436 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -35,7 +35,7 @@ #ifdef GPR_POSIX_SOCKET -#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_client_posix.h" #include <errno.h> #include <netinet/in.h> @@ -47,6 +47,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_posix.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -69,7 +70,7 @@ typedef struct { char *addr_str; grpc_endpoint **ep; grpc_closure *closure; - grpc_buffer_pool *buffer_pool; + grpc_channel_args *channel_args; } async_connect; static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { @@ -115,11 +116,38 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); - grpc_buffer_pool_internal_unref(exec_ctx, ac->buffer_pool); + grpc_channel_args_destroy(ac->channel_args); gpr_free(ac); } } +grpc_endpoint *grpc_tcp_client_create_from_fd( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, + const char *addr_str) { + size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + 8 * 1024 * 1024}; + tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( + &channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + buffer_pool = grpc_buffer_pool_internal_ref( + channel_args->args[i].value.pointer.p); + } + } + } + + grpc_endpoint *ep = + grpc_tcp_create(fd, buffer_pool, tcp_read_chunk_size, addr_str); + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + return ep; +} + static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect *ac = acp; int so_error = 0; @@ -192,8 +220,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { } } else { grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); - *ep = grpc_tcp_create(fd, ac->buffer_pool, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + *ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args, + ac->addr_str); fd = NULL; goto finish; } @@ -230,7 +258,7 @@ finish: static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, - grpc_buffer_pool *buffer_pool, + const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { int fd; @@ -279,8 +307,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, fdobj = grpc_fd_create(fd, name); if (err >= 0) { - *ep = grpc_tcp_create(fdobj, buffer_pool, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, - addr_str); + *ep = + grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str); grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } @@ -305,7 +333,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, ac->refs = 2; ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; - ac->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); + ac->channel_args = grpc_channel_args_copy(channel_args); if (grpc_tcp_trace) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", @@ -327,18 +355,18 @@ done: // overridden by api_fuzzer.c void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - grpc_pollset_set *interested_parties, grpc_buffer_pool *buffer_pool, + grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, - grpc_buffer_pool *buffer_pool, + const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, - buffer_pool, addr, addr_len, deadline); + channel_args, addr, addr_len, deadline); } #endif diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h new file mode 100644 index 0000000000..d8108b8359 --- /dev/null +++ b/src/core/lib/iomgr/tcp_client_posix.h @@ -0,0 +1,45 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H +#define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H + +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/tcp_client.h" + +grpc_endpoint *grpc_tcp_client_create_from_fd( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, + const char *addr_str); + +#endif |