aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2016-11-02 12:06:57 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2016-11-02 12:06:57 -0700
commit1c0b7a2d180f44012b302eb17ce103f6b4d45752 (patch)
tree977457c5f4aac78f1e364d7e64adf5f0e2e8b869 /src/core/ext/transport
parent68413c221e341f4f00326e892077c5fcd4b4f788 (diff)
parent11948f74414e6c95b81fbcc2f0d06afa0b1cbce5 (diff)
Merge remote-tracking branch 'upstream/master' into get_tos
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/alpn/alpn.c2
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c73
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c6
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c84
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c8
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c8
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c246
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h20
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_map.c11
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_map.h3
11 files changed, 304 insertions, 164 deletions
diff --git a/src/core/ext/transport/chttp2/alpn/alpn.c b/src/core/ext/transport/chttp2/alpn/alpn.c
index 48b0217265..55710dc5ae 100644
--- a/src/core/ext/transport/chttp2/alpn/alpn.c
+++ b/src/core/ext/transport/chttp2/alpn/alpn.c
@@ -36,7 +36,7 @@
#include <grpc/support/useful.h>
/* in order of preference */
-static const char *const supported_versions[] = {"h2"};
+static const char *const supported_versions[] = {"grpc-exp", "h2"};
int grpc_chttp2_is_alpn_version_supported(const char *version, size_t size) {
size_t i;
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index c2b59569fd..71a06e118b 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -40,9 +40,9 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
-#include "src/core/ext/client_config/client_channel.h"
-#include "src/core/ext/client_config/http_connect_handshaker.h"
-#include "src/core/ext/client_config/resolver_registry.h"
+#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/compress_filter.h"
@@ -52,6 +52,10 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
+//
+// connector
+//
+
typedef struct {
grpc_connector base;
gpr_refcount refs;
@@ -150,42 +154,27 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
c->tcp = NULL;
grpc_closure_init(&c->connected, connected, c);
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp,
- args->interested_parties, args->addr, args->addr_len,
- args->deadline);
+ args->interested_parties, args->channel_args,
+ args->addr, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
-typedef struct {
- grpc_client_channel_factory base;
- gpr_refcount refs;
- grpc_channel_args *merge_args;
-} client_channel_factory;
+//
+// client_channel_factory
+//
static void client_channel_factory_ref(
- grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- gpr_ref(&f->refs);
-}
+ grpc_client_channel_factory *cc_factory) {}
static void client_channel_factory_unref(
- grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- if (gpr_unref(&f->refs)) {
- grpc_channel_args_destroy(f->merge_args);
- gpr_free(f);
- }
-}
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
- grpc_subchannel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
+ const grpc_subchannel_args *args) {
connector *c = gpr_malloc(sizeof(*c));
- grpc_channel_args *final_args =
- grpc_channel_args_merge(args->args, f->merge_args);
- grpc_subchannel *s;
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
@@ -197,23 +186,18 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_http_connect_handshaker_create(proxy_name, args->server_name));
gpr_free(proxy_name);
}
- args->args = final_args;
- s = grpc_subchannel_create(exec_ctx, &c->base, args);
+ grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
- grpc_channel_args_destroy(final_args);
return s;
}
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
- grpc_channel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
- grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
- GRPC_CLIENT_CHANNEL, NULL);
- grpc_channel_args_destroy(final_args);
- grpc_resolver *resolver = grpc_resolver_create(target);
+ const grpc_channel_args *args) {
+ grpc_channel *channel =
+ grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
+ grpc_resolver *resolver = grpc_resolver_create(target, args);
if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel");
@@ -221,7 +205,7 @@ static grpc_channel *client_channel_factory_create_channel(
}
grpc_client_channel_finish_initialization(
- exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver, cc_factory);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
return channel;
@@ -232,6 +216,9 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
+static grpc_client_channel_factory client_channel_factory = {
+ &client_channel_factory_vtable};
+
/* Create a client channel:
Asynchronously: - resolve target
- connect to it (trying alternatives as presented)
@@ -245,16 +232,12 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
(target, args, reserved));
GPR_ASSERT(!reserved);
- client_channel_factory *f = gpr_malloc(sizeof(*f));
- memset(f, 0, sizeof(*f));
- f->base.vtable = &client_channel_factory_vtable;
- gpr_ref_init(&f->refs, 1);
- f->merge_args = grpc_channel_args_copy(args);
-
+ grpc_client_channel_factory *factory =
+ (grpc_client_channel_factory *)&client_channel_factory;
grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args);
- grpc_client_channel_factory_unref(&exec_ctx, &f->base);
+ grpc_client_channel_factory_unref(&exec_ctx, factory);
grpc_exec_ctx_finish(&exec_ctx);
return channel != NULL ? channel : grpc_lame_client_channel_create(
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
index b2c5e5b088..1e5b1c22e3 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
@@ -44,6 +44,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/tcp_client_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@@ -65,9 +66,8 @@ grpc_channel *grpc_insecure_channel_create_from_fd(
int flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
- grpc_endpoint *client =
- grpc_tcp_create(grpc_fd_create(fd, "client"),
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "fd-client");
+ grpc_endpoint *client = grpc_tcp_client_create_from_fd(
+ &exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client");
grpc_transport *transport =
grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 31c54ff74c..d0ac72a011 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -40,9 +40,9 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
-#include "src/core/ext/client_config/client_channel.h"
-#include "src/core/ext/client_config/http_connect_handshaker.h"
-#include "src/core/ext/client_config/resolver_registry.h"
+#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
@@ -54,6 +54,10 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/tsi/transport_security_interface.h"
+//
+// connector
+//
+
typedef struct {
grpc_connector base;
gpr_refcount refs;
@@ -210,16 +214,19 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
grpc_closure_init(&c->connected_closure, connected, c);
grpc_tcp_client_connect(
exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint,
- args->interested_parties, args->addr, args->addr_len, args->deadline);
+ args->interested_parties, args->channel_args, args->addr, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
+//
+// client_channel_factory
+//
+
typedef struct {
grpc_client_channel_factory base;
gpr_refcount refs;
- grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
} client_channel_factory;
@@ -235,19 +242,15 @@ static void client_channel_factory_unref(
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"client_channel_factory");
- grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
- grpc_subchannel_args *args) {
+ const grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
- grpc_channel_args *final_args =
- grpc_channel_args_merge(args->args, f->merge_args);
- grpc_subchannel *s;
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
@@ -261,25 +264,19 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
}
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
- args->args = final_args;
- s = grpc_subchannel_create(exec_ctx, &c->base, args);
+ grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
- grpc_channel_args_destroy(final_args);
return s;
}
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
- grpc_channel_args *args) {
+ const grpc_channel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
-
- grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
- grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
- GRPC_CLIENT_CHANNEL, NULL);
- grpc_channel_args_destroy(final_args);
-
- grpc_resolver *resolver = grpc_resolver_create(target);
+ grpc_channel *channel =
+ grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
+ grpc_resolver *resolver = grpc_resolver_create(target, args);
if (resolver != NULL) {
grpc_client_channel_finish_initialization(
exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
@@ -289,9 +286,6 @@ static grpc_channel *client_channel_factory_create_channel(
"client_channel_factory_create_channel");
channel = NULL;
}
-
- GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
- "client_channel_factory_create_channel");
return channel;
}
@@ -308,19 +302,13 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
const char *target,
const grpc_channel_args *args,
void *reserved) {
- grpc_arg connector_arg;
- grpc_channel_args *args_copy;
- grpc_channel_args *new_args_from_connector;
- grpc_channel_security_connector *security_connector;
- client_channel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
GRPC_API_TRACE(
"grpc_secure_channel_create(creds=%p, target=%s, args=%p, "
"reserved=%p)",
4, (creds, target, args, reserved));
GPR_ASSERT(reserved == NULL);
-
+ // Make sure security connector does not already exist in args.
if (grpc_find_security_connector_in_args(args) != NULL) {
gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
grpc_exec_ctx_finish(&exec_ctx);
@@ -328,7 +316,9 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
target, GRPC_STATUS_INTERNAL,
"Security connector exists in channel args.");
}
-
+ // Create security connector and construct new channel args.
+ grpc_channel_security_connector *security_connector;
+ grpc_channel_args *new_args_from_connector;
if (grpc_channel_credentials_create_security_connector(
creds, target, args, &security_connector, &new_args_from_connector) !=
GRPC_SECURITY_OK) {
@@ -336,32 +326,30 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
return grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL, "Failed to create security connector.");
}
-
- connector_arg = grpc_security_connector_to_arg(&security_connector->base);
- args_copy = grpc_channel_args_copy_and_add(
+ grpc_arg connector_arg =
+ grpc_security_connector_to_arg(&security_connector->base);
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
-
- f = gpr_malloc(sizeof(*f));
- memset(f, 0, sizeof(*f));
- f->base.vtable = &client_channel_factory_vtable;
- gpr_ref_init(&f->refs, 1);
-
- f->merge_args = grpc_channel_args_copy(args_copy);
- grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector);
}
-
+ // Create client channel factory.
+ client_channel_factory *f = gpr_malloc(sizeof(*f));
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_channel_factory_vtable;
+ gpr_ref_init(&f->refs, 1);
GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
"grpc_secure_channel_create");
f->security_connector = security_connector;
-
+ // Create channel.
grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
-
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
+ // Clean up.
+ GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
+ "secure_client_channel_factory_create_channel");
+ grpc_channel_args_destroy(new_args);
grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
-
return channel; /* may be NULL */
}
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index f0e07429fa..d42611b863 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -139,8 +139,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- err =
- grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp);
+ err = grpc_tcp_server_create(&exec_ctx, NULL,
+ grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
@@ -148,9 +148,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
const size_t naddrs = resolved->naddrs;
errors = gpr_malloc(sizeof(*errors) * naddrs);
for (i = 0; i < naddrs; i++) {
- errors[i] = grpc_tcp_server_add_port(
- tcp, (struct sockaddr *)&resolved->addrs[i].addr,
- resolved->addrs[i].len, &port_temp);
+ errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp);
if (errors[i] == GRPC_ERROR_NONE) {
if (port_num == -1) {
port_num = port_temp;
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index 9af17fb5ae..aa2ecf5743 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -57,8 +57,12 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
char *name;
gpr_asprintf(&name, "fd:%d", fd);
- grpc_endpoint *server_endpoint = grpc_tcp_create(
- grpc_fd_create(fd, name), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
+ grpc_resource_quota *resource_quota = grpc_resource_quota_from_channel_args(
+ grpc_server_get_channel_args(server));
+ grpc_endpoint *server_endpoint =
+ grpc_tcp_create(grpc_fd_create(fd, name), resource_quota,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
+ grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
gpr_free(name);
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 563271f4f8..7ad687042d 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -271,7 +271,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
memset(server_state, 0, sizeof(*server_state));
grpc_closure_init(&server_state->tcp_server_shutdown_complete,
tcp_server_shutdown_complete, server_state);
- err = grpc_tcp_server_create(&server_state->tcp_server_shutdown_complete,
+ err = grpc_tcp_server_create(&exec_ctx,
+ &server_state->tcp_server_shutdown_complete,
grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
@@ -286,9 +287,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
errors = gpr_malloc(sizeof(*errors) * resolved->naddrs);
for (i = 0; i < resolved->naddrs; i++) {
- errors[i] = grpc_tcp_server_add_port(
- tcp, (struct sockaddr *)&resolved->addrs[i].addr,
- resolved->addrs[i].len, &port_temp);
+ errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp);
if (errors[i] == GRPC_ERROR_NONE) {
if (port_num == -1) {
port_num = port_temp;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 9f7bc818b6..b61a3a0314 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -114,6 +114,20 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *error);
+static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+
+static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_error *error);
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -241,6 +255,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t);
grpc_closure_init(&t->read_action_begin, read_action_begin, t);
grpc_closure_init(&t->read_action_locked, read_action_locked, t);
+ grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
+ grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
+ grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t);
+ grpc_closure_init(&t->destructive_reclaimer_locked,
+ destructive_reclaimer_locked, t);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser);
@@ -362,6 +381,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
+ post_benign_reclaimer(exec_ctx, t);
}
static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
@@ -467,6 +487,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ post_destructive_reclaimer(exec_ctx, t);
}
GPR_TIMER_END("init_stream", 0);
@@ -675,6 +696,13 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
}
+ if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
+ t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
+ if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE("goaway sent"));
+ }
+ }
+
grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
switch (t->write_state) {
@@ -780,6 +808,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes);
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ post_destructive_reclaimer(exec_ctx, t);
grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
}
/* cancel out streams that will never be started */
@@ -853,61 +882,60 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) {
static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
+ grpc_chttp2_stream *s) {
+ s->fetched_send_message_length +=
+ (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice);
+ gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
+ if (s->id != 0) {
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
+ }
+}
static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- if (s->fetching_send_message == NULL) {
- /* Stream was cancelled before message fetch completed */
- abort(); /* TODO(ctiller): what cleanup here? */
- return;
- }
- if (s->fetched_send_message_length == s->fetching_send_message->length) {
- int64_t notify_offset = s->next_message_end_offset;
- if (notify_offset <= s->flow_controlled_bytes_written) {
- grpc_chttp2_complete_closure_step(
- exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
- "fetching_send_message_finished");
- } else {
- grpc_chttp2_write_cb *cb = t->write_cb_pool;
- if (cb == NULL) {
- cb = gpr_malloc(sizeof(*cb));
+ for (;;) {
+ if (s->fetching_send_message == NULL) {
+ /* Stream was cancelled before message fetch completed */
+ abort(); /* TODO(ctiller): what cleanup here? */
+ return; /* early out */
+ }
+ if (s->fetched_send_message_length == s->fetching_send_message->length) {
+ int64_t notify_offset = s->next_message_end_offset;
+ if (notify_offset <= s->flow_controlled_bytes_written) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
+ "fetching_send_message_finished");
} else {
- t->write_cb_pool = cb->next;
+ grpc_chttp2_write_cb *cb = t->write_cb_pool;
+ if (cb == NULL) {
+ cb = gpr_malloc(sizeof(*cb));
+ } else {
+ t->write_cb_pool = cb->next;
+ }
+ cb->call_at_byte = notify_offset;
+ cb->closure = s->fetching_send_message_finished;
+ s->fetching_send_message_finished = NULL;
+ cb->next = s->on_write_finished_cbs;
+ s->on_write_finished_cbs = cb;
}
- cb->call_at_byte = notify_offset;
- cb->closure = s->fetching_send_message_finished;
- s->fetching_send_message_finished = NULL;
- cb->next = s->on_write_finished_cbs;
- s->on_write_finished_cbs = cb;
+ s->fetching_send_message = NULL;
+ return; /* early out */
+ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
+ &s->fetching_slice, UINT32_MAX,
+ &s->complete_fetch)) {
+ add_fetched_slice_locked(exec_ctx, t, s);
}
- s->fetching_send_message = NULL;
- } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
- &s->fetching_slice, UINT32_MAX,
- &s->complete_fetch)) {
- add_fetched_slice_locked(exec_ctx, t, s);
}
}
-static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- s->fetched_send_message_length +=
- (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice);
- gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
- if (s->id != 0) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
- }
- continue_fetching_send_locked(exec_ctx, t, s);
-}
-
static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_error *error) {
grpc_chttp2_stream *s = gs;
grpc_chttp2_transport *t = s->t;
if (error == GRPC_ERROR_NONE) {
add_fetched_slice_locked(exec_ctx, t, s);
+ continue_fetching_send_locked(exec_ctx, t, s);
} else {
/* TODO(ctiller): what to do here */
abort();
@@ -994,9 +1022,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (!s->write_closed) {
if (t->is_client) {
- GPR_ASSERT(s->id == 0);
- grpc_chttp2_list_add_waiting_for_concurrency(t, s);
- maybe_start_some_streams(exec_ctx, t);
+ if (!t->closed) {
+ GPR_ASSERT(s->id == 0);
+ grpc_chttp2_list_add_waiting_for_concurrency(t, s);
+ maybe_start_some_streams(exec_ctx, t);
+ } else {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s,
+ GRPC_ERROR_CREATE("Transport closed"));
+ }
} else {
GPR_ASSERT(s->id != 0);
grpc_chttp2_become_writable(exec_ctx, t, s, true,
@@ -1186,6 +1219,14 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_free(msg);
}
+static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_error_code error, gpr_slice data) {
+ t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
+ grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data,
+ &t->qbuf);
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+}
+
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
@@ -1200,15 +1241,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_goaway) {
- t->sent_goaway = 1;
- grpc_chttp2_goaway_append(
- t->last_new_stream_id,
- (uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
- gpr_slice_ref(*op->goaway_message), &t->qbuf);
- close_transport = grpc_chttp2_stream_map_size(&t->stream_map) == 0
- ? GRPC_ERROR_CREATE("GOAWAY sent")
- : GRPC_ERROR_NONE;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+ send_goaway(exec_ctx, t,
+ grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
+ gpr_slice_ref(*op->goaway_message));
}
if (op->set_accept_stream) {
@@ -1342,10 +1377,14 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->data_parser.parsing_frame = NULL;
}
- if (grpc_chttp2_stream_map_size(&t->stream_map) == 0 && t->sent_goaway) {
- close_transport_locked(
- exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING(
- "Last stream closed after sending GOAWAY", &error, 1));
+ if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ post_benign_reclaimer(exec_ctx, t);
+ if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
+ close_transport_locked(
+ exec_ctx, t,
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Last stream closed after sending GOAWAY", &error, 1));
+ }
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
@@ -2073,6 +2112,103 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
}
/*******************************************************************************
+ * RESOURCE QUOTAS
+ */
+
+static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ if (!t->benign_reclaimer_registered) {
+ t->benign_reclaimer_registered = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
+ grpc_resource_user_post_reclaimer(exec_ctx,
+ grpc_endpoint_get_resource_user(t->ep),
+ false, &t->benign_reclaimer);
+ }
+}
+
+static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ if (!t->destructive_reclaimer_registered) {
+ t->destructive_reclaimer_registered = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
+ grpc_resource_user_post_reclaimer(exec_ctx,
+ grpc_endpoint_get_resource_user(t->ep),
+ true, &t->destructive_reclaimer);
+ }
+}
+
+static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ if (error == GRPC_ERROR_NONE &&
+ grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ /* Channel with no active streams: send a goaway to try and make it
+ * disconnect cleanly */
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
+ t->peer_string);
+ }
+ send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM,
+ gpr_slice_from_static_string("Buffers full"));
+ } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG,
+ "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
+ " streams",
+ t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
+ }
+ t->benign_reclaimer_registered = false;
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_resource_user_finish_reclamation(
+ exec_ctx, grpc_endpoint_get_resource_user(t->ep));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer");
+}
+
+static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
+ t->destructive_reclaimer_registered = false;
+ if (error == GRPC_ERROR_NONE && n > 0) {
+ grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map);
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string,
+ s->id);
+ }
+ grpc_chttp2_cancel_stream(
+ exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
+ GRPC_ERROR_INT_HTTP2_ERROR,
+ GRPC_CHTTP2_ENHANCE_YOUR_CALM));
+ if (n > 1) {
+ /* Since we cancel one stream per destructive reclamation, if
+ there are more streams left, we can immediately post a new
+ reclaimer in case the resource quota needs to free more
+ memory */
+ post_destructive_reclaimer(exec_ctx, t);
+ }
+ }
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_resource_user_finish_reclamation(
+ exec_ctx, grpc_endpoint_get_resource_user(t->ep));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");
+}
+
+/*******************************************************************************
* TRACING
*/
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 01f521c360..e0c4a1e925 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -138,6 +138,12 @@ typedef enum {
GRPC_NUM_SETTING_SETS
} grpc_chttp2_setting_set;
+typedef enum {
+ GRPC_CHTTP2_NO_GOAWAY_SEND,
+ GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
+ GRPC_CHTTP2_GOAWAY_SENT,
+} grpc_chttp2_sent_goaway_state;
+
/* Outstanding ping request data */
typedef struct grpc_chttp2_outstanding_ping {
uint8_t id[8];
@@ -249,7 +255,7 @@ struct grpc_chttp2_transport {
/** have we seen a goaway */
uint8_t seen_goaway;
/** have we sent a goaway */
- uint8_t sent_goaway;
+ grpc_chttp2_sent_goaway_state sent_goaway_state;
/** are the local settings dirty and need to be sent? */
uint8_t dirtied_local_settings;
@@ -320,6 +326,18 @@ struct grpc_chttp2_transport {
/* if non-NULL, close the transport with this error when writes are finished
*/
grpc_error *close_transport_on_writes_finished;
+
+ /* buffer pool state */
+ /** have we scheduled a benign cleanup? */
+ bool benign_reclaimer_registered;
+ /** have we scheduled a destructive cleanup? */
+ bool destructive_reclaimer_registered;
+ /** benign cleanup closure */
+ grpc_closure benign_reclaimer;
+ grpc_closure benign_reclaimer_locked;
+ /** destructive cleanup closure */
+ grpc_closure destructive_reclaimer;
+ grpc_closure destructive_reclaimer_locked;
};
typedef enum {
diff --git a/src/core/ext/transport/chttp2/transport/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c
index 59b3a14e0a..5f5a28446d 100644
--- a/src/core/ext/transport/chttp2/transport/stream_map.c
+++ b/src/core/ext/transport/chttp2/transport/stream_map.c
@@ -151,6 +151,17 @@ size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map) {
return map->count - map->free;
}
+void *grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map *map) {
+ if (map->count == map->free) {
+ return NULL;
+ }
+ if (map->free != 0) {
+ map->count = compact(map->keys, map->values, map->count);
+ map->free = 0;
+ }
+ return map->values[((size_t)rand()) % map->count];
+}
+
void grpc_chttp2_stream_map_for_each(grpc_chttp2_stream_map *map,
void (*f)(void *user_data, uint32_t key,
void *value),
diff --git a/src/core/ext/transport/chttp2/transport/stream_map.h b/src/core/ext/transport/chttp2/transport/stream_map.h
index e76312dd1a..203f640680 100644
--- a/src/core/ext/transport/chttp2/transport/stream_map.h
+++ b/src/core/ext/transport/chttp2/transport/stream_map.h
@@ -68,6 +68,9 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, uint32_t key);
/* Return an existing key, or NULL if it does not exist */
void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, uint32_t key);
+/* Return a random entry */
+void *grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map *map);
+
/* How many (populated) entries are in the stream map? */
size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map);