diff options
author | Yuchen Zeng <zyc@google.com> | 2016-11-02 12:06:57 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-11-02 12:06:57 -0700 |
commit | 1c0b7a2d180f44012b302eb17ce103f6b4d45752 (patch) | |
tree | 977457c5f4aac78f1e364d7e64adf5f0e2e8b869 /src/core/ext/transport | |
parent | 68413c221e341f4f00326e892077c5fcd4b4f788 (diff) | |
parent | 11948f74414e6c95b81fbcc2f0d06afa0b1cbce5 (diff) |
Merge remote-tracking branch 'upstream/master' into get_tos
Diffstat (limited to 'src/core/ext/transport')
11 files changed, 304 insertions, 164 deletions
diff --git a/src/core/ext/transport/chttp2/alpn/alpn.c b/src/core/ext/transport/chttp2/alpn/alpn.c index 48b0217265..55710dc5ae 100644 --- a/src/core/ext/transport/chttp2/alpn/alpn.c +++ b/src/core/ext/transport/chttp2/alpn/alpn.c @@ -36,7 +36,7 @@ #include <grpc/support/useful.h> /* in order of preference */ -static const char *const supported_versions[] = {"h2"}; +static const char *const supported_versions[] = {"grpc-exp", "h2"}; int grpc_chttp2_is_alpn_version_supported(const char *version, size_t size) { size_t i; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index c2b59569fd..71a06e118b 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -40,9 +40,9 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> -#include "src/core/ext/client_config/client_channel.h" -#include "src/core/ext/client_config/http_connect_handshaker.h" -#include "src/core/ext/client_config/resolver_registry.h" +#include "src/core/ext/client_channel/client_channel.h" +#include "src/core/ext/client_channel/http_connect_handshaker.h" +#include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/compress_filter.h" @@ -52,6 +52,10 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" +// +// connector +// + typedef struct { grpc_connector base; gpr_refcount refs; @@ -150,42 +154,27 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, c->tcp = NULL; grpc_closure_init(&c->connected, connected, c); grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp, - args->interested_parties, args->addr, args->addr_len, - args->deadline); + args->interested_parties, args->channel_args, + args->addr, args->deadline); } static const grpc_connector_vtable connector_vtable = { connector_ref, connector_unref, connector_shutdown, connector_connect}; -typedef struct { - grpc_client_channel_factory base; - gpr_refcount refs; - grpc_channel_args *merge_args; -} client_channel_factory; +// +// client_channel_factory +// static void client_channel_factory_ref( - grpc_client_channel_factory *cc_factory) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - gpr_ref(&f->refs); -} + grpc_client_channel_factory *cc_factory) {} static void client_channel_factory_unref( - grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - if (gpr_unref(&f->refs)) { - grpc_channel_args_destroy(f->merge_args); - gpr_free(f); - } -} + grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {} static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, - grpc_subchannel_args *args) { - client_channel_factory *f = (client_channel_factory *)cc_factory; + const grpc_subchannel_args *args) { connector *c = gpr_malloc(sizeof(*c)); - grpc_channel_args *final_args = - grpc_channel_args_merge(args->args, f->merge_args); - grpc_subchannel *s; memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); @@ -197,23 +186,18 @@ static grpc_subchannel *client_channel_factory_create_subchannel( grpc_http_connect_handshaker_create(proxy_name, args->server_name)); gpr_free(proxy_name); } - args->args = final_args; - s = grpc_subchannel_create(exec_ctx, &c->base, args); + grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); - grpc_channel_args_destroy(final_args); return s; } static grpc_channel *client_channel_factory_create_channel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const char *target, grpc_client_channel_type type, - grpc_channel_args *args) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args); - grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args, - GRPC_CLIENT_CHANNEL, NULL); - grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = grpc_resolver_create(target); + const grpc_channel_args *args) { + grpc_channel *channel = + grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL); + grpc_resolver *resolver = grpc_resolver_create(target, args); if (!resolver) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "client_channel_factory_create_channel"); @@ -221,7 +205,7 @@ static grpc_channel *client_channel_factory_create_channel( } grpc_client_channel_finish_initialization( - exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); + exec_ctx, grpc_channel_get_channel_stack(channel), resolver, cc_factory); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel"); return channel; @@ -232,6 +216,9 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable = client_channel_factory_create_subchannel, client_channel_factory_create_channel}; +static grpc_client_channel_factory client_channel_factory = { + &client_channel_factory_vtable}; + /* Create a client channel: Asynchronously: - resolve target - connect to it (trying alternatives as presented) @@ -245,16 +232,12 @@ grpc_channel *grpc_insecure_channel_create(const char *target, (target, args, reserved)); GPR_ASSERT(!reserved); - client_channel_factory *f = gpr_malloc(sizeof(*f)); - memset(f, 0, sizeof(*f)); - f->base.vtable = &client_channel_factory_vtable; - gpr_ref_init(&f->refs, 1); - f->merge_args = grpc_channel_args_copy(args); - + grpc_client_channel_factory *factory = + (grpc_client_channel_factory *)&client_channel_factory; grpc_channel *channel = client_channel_factory_create_channel( - &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); + &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args); - grpc_client_channel_factory_unref(&exec_ctx, &f->base); + grpc_client_channel_factory_unref(&exec_ctx, factory); grpc_exec_ctx_finish(&exec_ctx); return channel != NULL ? channel : grpc_lame_client_channel_create( diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c index b2c5e5b088..1e5b1c22e3 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c @@ -44,6 +44,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_client_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" @@ -65,9 +66,8 @@ grpc_channel *grpc_insecure_channel_create_from_fd( int flags = fcntl(fd, F_GETFL, 0); GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); - grpc_endpoint *client = - grpc_tcp_create(grpc_fd_create(fd, "client"), - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "fd-client"); + grpc_endpoint *client = grpc_tcp_client_create_from_fd( + &exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client"); grpc_transport *transport = grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 31c54ff74c..d0ac72a011 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -40,9 +40,9 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> -#include "src/core/ext/client_config/client_channel.h" -#include "src/core/ext/client_config/http_connect_handshaker.h" -#include "src/core/ext/client_config/resolver_registry.h" +#include "src/core/ext/client_channel/client_channel.h" +#include "src/core/ext/client_channel/http_connect_handshaker.h" +#include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" @@ -54,6 +54,10 @@ #include "src/core/lib/surface/channel.h" #include "src/core/lib/tsi/transport_security_interface.h" +// +// connector +// + typedef struct { grpc_connector base; gpr_refcount refs; @@ -210,16 +214,19 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, grpc_closure_init(&c->connected_closure, connected, c); grpc_tcp_client_connect( exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint, - args->interested_parties, args->addr, args->addr_len, args->deadline); + args->interested_parties, args->channel_args, args->addr, args->deadline); } static const grpc_connector_vtable connector_vtable = { connector_ref, connector_unref, connector_shutdown, connector_connect}; +// +// client_channel_factory +// + typedef struct { grpc_client_channel_factory base; gpr_refcount refs; - grpc_channel_args *merge_args; grpc_channel_security_connector *security_connector; } client_channel_factory; @@ -235,19 +242,15 @@ static void client_channel_factory_unref( if (gpr_unref(&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "client_channel_factory"); - grpc_channel_args_destroy(f->merge_args); gpr_free(f); } } static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, - grpc_subchannel_args *args) { + const grpc_subchannel_args *args) { client_channel_factory *f = (client_channel_factory *)cc_factory; connector *c = gpr_malloc(sizeof(*c)); - grpc_channel_args *final_args = - grpc_channel_args_merge(args->args, f->merge_args); - grpc_subchannel *s; memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; @@ -261,25 +264,19 @@ static grpc_subchannel *client_channel_factory_create_subchannel( } gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); - args->args = final_args; - s = grpc_subchannel_create(exec_ctx, &c->base, args); + grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); - grpc_channel_args_destroy(final_args); return s; } static grpc_channel *client_channel_factory_create_channel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const char *target, grpc_client_channel_type type, - grpc_channel_args *args) { + const grpc_channel_args *args) { client_channel_factory *f = (client_channel_factory *)cc_factory; - - grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args); - grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args, - GRPC_CLIENT_CHANNEL, NULL); - grpc_channel_args_destroy(final_args); - - grpc_resolver *resolver = grpc_resolver_create(target); + grpc_channel *channel = + grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL); + grpc_resolver *resolver = grpc_resolver_create(target, args); if (resolver != NULL) { grpc_client_channel_finish_initialization( exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); @@ -289,9 +286,6 @@ static grpc_channel *client_channel_factory_create_channel( "client_channel_factory_create_channel"); channel = NULL; } - - GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, - "client_channel_factory_create_channel"); return channel; } @@ -308,19 +302,13 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, const char *target, const grpc_channel_args *args, void *reserved) { - grpc_arg connector_arg; - grpc_channel_args *args_copy; - grpc_channel_args *new_args_from_connector; - grpc_channel_security_connector *security_connector; - client_channel_factory *f; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_API_TRACE( "grpc_secure_channel_create(creds=%p, target=%s, args=%p, " "reserved=%p)", 4, (creds, target, args, reserved)); GPR_ASSERT(reserved == NULL); - + // Make sure security connector does not already exist in args. if (grpc_find_security_connector_in_args(args) != NULL) { gpr_log(GPR_ERROR, "Cannot set security context in channel args."); grpc_exec_ctx_finish(&exec_ctx); @@ -328,7 +316,9 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, target, GRPC_STATUS_INTERNAL, "Security connector exists in channel args."); } - + // Create security connector and construct new channel args. + grpc_channel_security_connector *security_connector; + grpc_channel_args *new_args_from_connector; if (grpc_channel_credentials_create_security_connector( creds, target, args, &security_connector, &new_args_from_connector) != GRPC_SECURITY_OK) { @@ -336,32 +326,30 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, return grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, "Failed to create security connector."); } - - connector_arg = grpc_security_connector_to_arg(&security_connector->base); - args_copy = grpc_channel_args_copy_and_add( + grpc_arg connector_arg = + grpc_security_connector_to_arg(&security_connector->base); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add( new_args_from_connector != NULL ? new_args_from_connector : args, &connector_arg, 1); - - f = gpr_malloc(sizeof(*f)); - memset(f, 0, sizeof(*f)); - f->base.vtable = &client_channel_factory_vtable; - gpr_ref_init(&f->refs, 1); - - f->merge_args = grpc_channel_args_copy(args_copy); - grpc_channel_args_destroy(args_copy); if (new_args_from_connector != NULL) { grpc_channel_args_destroy(new_args_from_connector); } - + // Create client channel factory. + client_channel_factory *f = gpr_malloc(sizeof(*f)); + memset(f, 0, sizeof(*f)); + f->base.vtable = &client_channel_factory_vtable; + gpr_ref_init(&f->refs, 1); GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, "grpc_secure_channel_create"); f->security_connector = security_connector; - + // Create channel. grpc_channel *channel = client_channel_factory_create_channel( - &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); - + &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); + // Clean up. + GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, + "secure_client_channel_factory_create_channel"); + grpc_channel_args_destroy(new_args); grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_exec_ctx_finish(&exec_ctx); - return channel; /* may be NULL */ } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index f0e07429fa..d42611b863 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -139,8 +139,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { goto error; } - err = - grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp); + err = grpc_tcp_server_create(&exec_ctx, NULL, + grpc_server_get_channel_args(server), &tcp); if (err != GRPC_ERROR_NONE) { goto error; } @@ -148,9 +148,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { const size_t naddrs = resolved->naddrs; errors = gpr_malloc(sizeof(*errors) * naddrs); for (i = 0; i < naddrs; i++) { - errors[i] = grpc_tcp_server_add_port( - tcp, (struct sockaddr *)&resolved->addrs[i].addr, - resolved->addrs[i].len, &port_temp); + errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); if (errors[i] == GRPC_ERROR_NONE) { if (port_num == -1) { port_num = port_temp; diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c index 9af17fb5ae..aa2ecf5743 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -57,8 +57,12 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, char *name; gpr_asprintf(&name, "fd:%d", fd); - grpc_endpoint *server_endpoint = grpc_tcp_create( - grpc_fd_create(fd, name), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); + grpc_resource_quota *resource_quota = grpc_resource_quota_from_channel_args( + grpc_server_get_channel_args(server)); + grpc_endpoint *server_endpoint = + grpc_tcp_create(grpc_fd_create(fd, name), resource_quota, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); gpr_free(name); diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 563271f4f8..7ad687042d 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -271,7 +271,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, memset(server_state, 0, sizeof(*server_state)); grpc_closure_init(&server_state->tcp_server_shutdown_complete, tcp_server_shutdown_complete, server_state); - err = grpc_tcp_server_create(&server_state->tcp_server_shutdown_complete, + err = grpc_tcp_server_create(&exec_ctx, + &server_state->tcp_server_shutdown_complete, grpc_server_get_channel_args(server), &tcp); if (err != GRPC_ERROR_NONE) { goto error; @@ -286,9 +287,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); for (i = 0; i < resolved->naddrs; i++) { - errors[i] = grpc_tcp_server_add_port( - tcp, (struct sockaddr *)&resolved->addrs[i].addr, - resolved->addrs[i].len, &port_temp); + errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); if (errors[i] == GRPC_ERROR_NONE) { if (port_num == -1) { port_num = port_temp; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 9f7bc818b6..b61a3a0314 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -114,6 +114,20 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); +static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); + +static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); +static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); + static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -241,6 +255,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t); grpc_closure_init(&t->read_action_begin, read_action_begin, t); grpc_closure_init(&t->read_action_locked, read_action_locked, t); + grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t); + grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t); + grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t); + grpc_closure_init(&t->destructive_reclaimer_locked, + destructive_reclaimer_locked, t); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); @@ -362,6 +381,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); + post_benign_reclaimer(exec_ctx, t); } static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, @@ -467,6 +487,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + post_destructive_reclaimer(exec_ctx, t); } GPR_TIMER_END("init_stream", 0); @@ -675,6 +696,13 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); } + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE("goaway sent")); + } + } + grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); switch (t->write_state) { @@ -780,6 +808,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + post_destructive_reclaimer(exec_ctx, t); grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); } /* cancel out streams that will never be started */ @@ -853,61 +882,60 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) { static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s); + grpc_chttp2_stream *s) { + s->fetched_send_message_length += + (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); + gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); + if (s->id != 0) { + grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + } +} static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (s->fetching_send_message == NULL) { - /* Stream was cancelled before message fetch completed */ - abort(); /* TODO(ctiller): what cleanup here? */ - return; - } - if (s->fetched_send_message_length == s->fetching_send_message->length) { - int64_t notify_offset = s->next_message_end_offset; - if (notify_offset <= s->flow_controlled_bytes_written) { - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, - "fetching_send_message_finished"); - } else { - grpc_chttp2_write_cb *cb = t->write_cb_pool; - if (cb == NULL) { - cb = gpr_malloc(sizeof(*cb)); + for (;;) { + if (s->fetching_send_message == NULL) { + /* Stream was cancelled before message fetch completed */ + abort(); /* TODO(ctiller): what cleanup here? */ + return; /* early out */ + } + if (s->fetched_send_message_length == s->fetching_send_message->length) { + int64_t notify_offset = s->next_message_end_offset; + if (notify_offset <= s->flow_controlled_bytes_written) { + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, + "fetching_send_message_finished"); } else { - t->write_cb_pool = cb->next; + grpc_chttp2_write_cb *cb = t->write_cb_pool; + if (cb == NULL) { + cb = gpr_malloc(sizeof(*cb)); + } else { + t->write_cb_pool = cb->next; + } + cb->call_at_byte = notify_offset; + cb->closure = s->fetching_send_message_finished; + s->fetching_send_message_finished = NULL; + cb->next = s->on_write_finished_cbs; + s->on_write_finished_cbs = cb; } - cb->call_at_byte = notify_offset; - cb->closure = s->fetching_send_message_finished; - s->fetching_send_message_finished = NULL; - cb->next = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb; + s->fetching_send_message = NULL; + return; /* early out */ + } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, + &s->fetching_slice, UINT32_MAX, + &s->complete_fetch)) { + add_fetched_slice_locked(exec_ctx, t, s); } - s->fetching_send_message = NULL; - } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, - &s->fetching_slice, UINT32_MAX, - &s->complete_fetch)) { - add_fetched_slice_locked(exec_ctx, t, s); } } -static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - s->fetched_send_message_length += - (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); - gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); - if (s->id != 0) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); - } - continue_fetching_send_locked(exec_ctx, t, s); -} - static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_error *error) { grpc_chttp2_stream *s = gs; grpc_chttp2_transport *t = s->t; if (error == GRPC_ERROR_NONE) { add_fetched_slice_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s); } else { /* TODO(ctiller): what to do here */ abort(); @@ -994,9 +1022,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (!s->write_closed) { if (t->is_client) { - GPR_ASSERT(s->id == 0); - grpc_chttp2_list_add_waiting_for_concurrency(t, s); - maybe_start_some_streams(exec_ctx, t); + if (!t->closed) { + GPR_ASSERT(s->id == 0); + grpc_chttp2_list_add_waiting_for_concurrency(t, s); + maybe_start_some_streams(exec_ctx, t); + } else { + grpc_chttp2_cancel_stream(exec_ctx, t, s, + GRPC_ERROR_CREATE("Transport closed")); + } } else { GPR_ASSERT(s->id != 0); grpc_chttp2_become_writable(exec_ctx, t, s, true, @@ -1186,6 +1219,14 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_free(msg); } +static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_error_code error, gpr_slice data) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; + grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data, + &t->qbuf); + grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent"); +} + static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { @@ -1200,15 +1241,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_goaway) { - t->sent_goaway = 1; - grpc_chttp2_goaway_append( - t->last_new_stream_id, - (uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), - gpr_slice_ref(*op->goaway_message), &t->qbuf); - close_transport = grpc_chttp2_stream_map_size(&t->stream_map) == 0 - ? GRPC_ERROR_CREATE("GOAWAY sent") - : GRPC_ERROR_NONE; - grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent"); + send_goaway(exec_ctx, t, + grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), + gpr_slice_ref(*op->goaway_message)); } if (op->set_accept_stream) { @@ -1342,10 +1377,14 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->data_parser.parsing_frame = NULL; } - if (grpc_chttp2_stream_map_size(&t->stream_map) == 0 && t->sent_goaway) { - close_transport_locked( - exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING( - "Last stream closed after sending GOAWAY", &error, 1)); + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + post_benign_reclaimer(exec_ctx, t); + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) { + close_transport_locked( + exec_ctx, t, + GRPC_ERROR_CREATE_REFERENCING( + "Last stream closed after sending GOAWAY", &error, 1)); + } } if (grpc_chttp2_list_remove_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); @@ -2073,6 +2112,103 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( } /******************************************************************************* + * RESOURCE QUOTAS + */ + +static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + if (!t->benign_reclaimer_registered) { + t->benign_reclaimer_registered = true; + GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); + grpc_resource_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_resource_user(t->ep), + false, &t->benign_reclaimer); + } +} + +static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + if (!t->destructive_reclaimer_registered) { + t->destructive_reclaimer_registered = true; + GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); + grpc_resource_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_resource_user(t->ep), + true, &t->destructive_reclaimer); + } +} + +static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked, + GRPC_ERROR_REF(error), false); +} + +static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked, + GRPC_ERROR_REF(error), false); +} + +static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (error == GRPC_ERROR_NONE && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + /* Channel with no active streams: send a goaway to try and make it + * disconnect cleanly */ + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", + t->peer_string); + } + send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM, + gpr_slice_from_static_string("Buffers full")); + } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, + "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR + " streams", + t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); + } + t->benign_reclaimer_registered = false; + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + exec_ctx, grpc_endpoint_get_resource_user(t->ep)); + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); +} + +static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + size_t n = grpc_chttp2_stream_map_size(&t->stream_map); + t->destructive_reclaimer_registered = false; + if (error == GRPC_ERROR_NONE && n > 0) { + grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map); + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string, + s->id); + } + grpc_chttp2_cancel_stream( + exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, + GRPC_CHTTP2_ENHANCE_YOUR_CALM)); + if (n > 1) { + /* Since we cancel one stream per destructive reclamation, if + there are more streams left, we can immediately post a new + reclaimer in case the resource quota needs to free more + memory */ + post_destructive_reclaimer(exec_ctx, t); + } + } + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + exec_ctx, grpc_endpoint_get_resource_user(t->ep)); + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); +} + +/******************************************************************************* * TRACING */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 01f521c360..e0c4a1e925 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -138,6 +138,12 @@ typedef enum { GRPC_NUM_SETTING_SETS } grpc_chttp2_setting_set; +typedef enum { + GRPC_CHTTP2_NO_GOAWAY_SEND, + GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED, + GRPC_CHTTP2_GOAWAY_SENT, +} grpc_chttp2_sent_goaway_state; + /* Outstanding ping request data */ typedef struct grpc_chttp2_outstanding_ping { uint8_t id[8]; @@ -249,7 +255,7 @@ struct grpc_chttp2_transport { /** have we seen a goaway */ uint8_t seen_goaway; /** have we sent a goaway */ - uint8_t sent_goaway; + grpc_chttp2_sent_goaway_state sent_goaway_state; /** are the local settings dirty and need to be sent? */ uint8_t dirtied_local_settings; @@ -320,6 +326,18 @@ struct grpc_chttp2_transport { /* if non-NULL, close the transport with this error when writes are finished */ grpc_error *close_transport_on_writes_finished; + + /* buffer pool state */ + /** have we scheduled a benign cleanup? */ + bool benign_reclaimer_registered; + /** have we scheduled a destructive cleanup? */ + bool destructive_reclaimer_registered; + /** benign cleanup closure */ + grpc_closure benign_reclaimer; + grpc_closure benign_reclaimer_locked; + /** destructive cleanup closure */ + grpc_closure destructive_reclaimer; + grpc_closure destructive_reclaimer_locked; }; typedef enum { diff --git a/src/core/ext/transport/chttp2/transport/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c index 59b3a14e0a..5f5a28446d 100644 --- a/src/core/ext/transport/chttp2/transport/stream_map.c +++ b/src/core/ext/transport/chttp2/transport/stream_map.c @@ -151,6 +151,17 @@ size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map) { return map->count - map->free; } +void *grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map *map) { + if (map->count == map->free) { + return NULL; + } + if (map->free != 0) { + map->count = compact(map->keys, map->values, map->count); + map->free = 0; + } + return map->values[((size_t)rand()) % map->count]; +} + void grpc_chttp2_stream_map_for_each(grpc_chttp2_stream_map *map, void (*f)(void *user_data, uint32_t key, void *value), diff --git a/src/core/ext/transport/chttp2/transport/stream_map.h b/src/core/ext/transport/chttp2/transport/stream_map.h index e76312dd1a..203f640680 100644 --- a/src/core/ext/transport/chttp2/transport/stream_map.h +++ b/src/core/ext/transport/chttp2/transport/stream_map.h @@ -68,6 +68,9 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, uint32_t key); /* Return an existing key, or NULL if it does not exist */ void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, uint32_t key); +/* Return a random entry */ +void *grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map *map); + /* How many (populated) entries are in the stream map? */ size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map); |