aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-10-31 14:20:01 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-10-31 14:20:01 -0700
commit7e0c2f8301e6f76f5e8d38d1a3f891da20147c39 (patch)
treecf21380503e17debfbbcecb803af26a5d36897b3 /src/core/ext
parentcb356b26eaf8cfde85166a7f6a4b3b8b6b52f839 (diff)
parentccc6a9cbf264655ae6b60727cd86b987a62977c9 (diff)
Merge remote-tracking branch 'upstream/master' into lb_policy_name_channel_arg
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_channel/client_channel.c4
-rw-r--r--src/core/ext/client_channel/subchannel.c3
-rw-r--r--src/core/ext/client_channel/subchannel.h2
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c6
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c13
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c3
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c6
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c6
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c4
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c8
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c169
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h20
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_map.c11
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_map.h3
15 files changed, 223 insertions, 38 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 80b4f048c2..ff773ac334 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -1022,6 +1022,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
gpr_mu_destroy(&calld->mu);
GPR_ASSERT(calld->waiting_ops_count == 0);
+ if (calld->connected_subchannel != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
+ "picked");
+ }
gpr_free(calld->waiting_ops);
gpr_free(and_free_memory);
}
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index ab6aede23a..789966cb69 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -183,9 +183,10 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c);
}
-void grpc_connected_subchannel_ref(
+grpc_connected_subchannel *grpc_connected_subchannel_ref(
grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
+ return c;
}
void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 5652074074..93bd72d20d 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -97,7 +97,7 @@ grpc_subchannel *grpc_subchannel_weak_ref(
void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_connected_subchannel_ref(
+grpc_connected_subchannel *grpc_connected_subchannel_ref(
grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *channel
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 5d3433df74..ac3c6a305a 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -209,7 +209,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* Check atomically for a selected channel */
grpc_connected_subchannel *selected = GET_SELECTED(p);
if (selected != NULL) {
- *target = selected;
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
return 1;
}
@@ -218,7 +218,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
selected = GET_SELECTED(p);
if (selected) {
gpr_mu_unlock(&p->mu);
- *target = selected;
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
return 1;
} else {
if (!p->started_picking) {
@@ -310,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = selected;
+ *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index c0743b00e8..37a9b18b97 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -397,7 +397,9 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
/* readily available, report right away */
- *target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(selected->subchannel),
+ "picked");
if (user_data != NULL) {
*user_data = selected->user_data;
@@ -463,8 +465,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target =
- grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(selected->subchannel),
+ "picked");
if (pp->user_data != NULL) {
*pp->user_data = selected->user_data;
}
@@ -578,7 +581,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
gpr_mu_unlock(&p->mu);
- target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(selected->subchannel),
+ "picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
} else {
gpr_mu_unlock(&p->mu);
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 372fb7bf4c..71a06e118b 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -154,7 +154,8 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
c->tcp = NULL;
grpc_closure_init(&c->connected, connected, c);
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp,
- args->interested_parties, args->addr, args->deadline);
+ args->interested_parties, args->channel_args,
+ args->addr, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
index b2c5e5b088..1e5b1c22e3 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
@@ -44,6 +44,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/tcp_client_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@@ -65,9 +66,8 @@ grpc_channel *grpc_insecure_channel_create_from_fd(
int flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
- grpc_endpoint *client =
- grpc_tcp_create(grpc_fd_create(fd, "client"),
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "fd-client");
+ grpc_endpoint *client = grpc_tcp_client_create_from_fd(
+ &exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client");
grpc_transport *transport =
grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 29095af207..57e1a8ec01 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -212,9 +212,9 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
GPR_ASSERT(c->connecting_endpoint == NULL);
gpr_mu_unlock(&c->mu);
grpc_closure_init(&c->connected_closure, connected, c);
- grpc_tcp_client_connect(exec_ctx, &c->connected_closure,
- &c->newly_connecting_endpoint,
- args->interested_parties, args->addr, args->deadline);
+ grpc_tcp_client_connect(
+ exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint,
+ args->interested_parties, args->channel_args, args->addr, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index e4967c2f08..d42611b863 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -139,8 +139,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- err =
- grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp);
+ err = grpc_tcp_server_create(&exec_ctx, NULL,
+ grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index 9af17fb5ae..aa2ecf5743 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -57,8 +57,12 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
char *name;
gpr_asprintf(&name, "fd:%d", fd);
- grpc_endpoint *server_endpoint = grpc_tcp_create(
- grpc_fd_create(fd, name), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
+ grpc_resource_quota *resource_quota = grpc_resource_quota_from_channel_args(
+ grpc_server_get_channel_args(server));
+ grpc_endpoint *server_endpoint =
+ grpc_tcp_create(grpc_fd_create(fd, name), resource_quota,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
+ grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
gpr_free(name);
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 0fab888541..7ad687042d 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -271,7 +271,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
memset(server_state, 0, sizeof(*server_state));
grpc_closure_init(&server_state->tcp_server_shutdown_complete,
tcp_server_shutdown_complete, server_state);
- err = grpc_tcp_server_create(&server_state->tcp_server_shutdown_complete,
+ err = grpc_tcp_server_create(&exec_ctx,
+ &server_state->tcp_server_shutdown_complete,
grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index ecf3aea870..4a9f806354 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -114,6 +114,20 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *error);
+static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+
+static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_error *error);
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -241,6 +255,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t);
grpc_closure_init(&t->read_action_begin, read_action_begin, t);
grpc_closure_init(&t->read_action_locked, read_action_locked, t);
+ grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
+ grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
+ grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t);
+ grpc_closure_init(&t->destructive_reclaimer_locked,
+ destructive_reclaimer_locked, t);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser);
@@ -362,6 +381,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
+ post_benign_reclaimer(exec_ctx, t);
}
static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
@@ -467,6 +487,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ post_destructive_reclaimer(exec_ctx, t);
}
GPR_TIMER_END("init_stream", 0);
@@ -675,6 +696,13 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
}
+ if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
+ t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
+ if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE("goaway sent"));
+ }
+ }
+
grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
switch (t->write_state) {
@@ -780,6 +808,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes);
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ post_destructive_reclaimer(exec_ctx, t);
grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
}
/* cancel out streams that will never be started */
@@ -993,9 +1022,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (!s->write_closed) {
if (t->is_client) {
- GPR_ASSERT(s->id == 0);
- grpc_chttp2_list_add_waiting_for_concurrency(t, s);
- maybe_start_some_streams(exec_ctx, t);
+ if (!t->closed) {
+ GPR_ASSERT(s->id == 0);
+ grpc_chttp2_list_add_waiting_for_concurrency(t, s);
+ maybe_start_some_streams(exec_ctx, t);
+ } else {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s,
+ GRPC_ERROR_CREATE("Transport closed"));
+ }
} else {
GPR_ASSERT(s->id != 0);
grpc_chttp2_become_writable(exec_ctx, t, s, true,
@@ -1185,6 +1219,14 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_free(msg);
}
+static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_error_code error, gpr_slice data) {
+ t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
+ grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data,
+ &t->qbuf);
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+}
+
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
@@ -1199,15 +1241,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_goaway) {
- t->sent_goaway = 1;
- grpc_chttp2_goaway_append(
- t->last_new_stream_id,
- (uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
- gpr_slice_ref(*op->goaway_message), &t->qbuf);
- close_transport = grpc_chttp2_stream_map_size(&t->stream_map) == 0
- ? GRPC_ERROR_CREATE("GOAWAY sent")
- : GRPC_ERROR_NONE;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+ send_goaway(exec_ctx, t,
+ grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
+ gpr_slice_ref(*op->goaway_message));
}
if (op->set_accept_stream) {
@@ -1341,10 +1377,14 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->data_parser.parsing_frame = NULL;
}
- if (grpc_chttp2_stream_map_size(&t->stream_map) == 0 && t->sent_goaway) {
- close_transport_locked(
- exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING(
- "Last stream closed after sending GOAWAY", &error, 1));
+ if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ post_benign_reclaimer(exec_ctx, t);
+ if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
+ close_transport_locked(
+ exec_ctx, t,
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Last stream closed after sending GOAWAY", &error, 1));
+ }
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
@@ -2072,6 +2112,103 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
}
/*******************************************************************************
+ * RESOURCE QUOTAS
+ */
+
+static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ if (!t->benign_reclaimer_registered) {
+ t->benign_reclaimer_registered = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
+ grpc_resource_user_post_reclaimer(exec_ctx,
+ grpc_endpoint_get_resource_user(t->ep),
+ false, &t->benign_reclaimer);
+ }
+}
+
+static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ if (!t->destructive_reclaimer_registered) {
+ t->destructive_reclaimer_registered = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
+ grpc_resource_user_post_reclaimer(exec_ctx,
+ grpc_endpoint_get_resource_user(t->ep),
+ true, &t->destructive_reclaimer);
+ }
+}
+
+static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ if (error == GRPC_ERROR_NONE &&
+ grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ /* Channel with no active streams: send a goaway to try and make it
+ * disconnect cleanly */
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
+ t->peer_string);
+ }
+ send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM,
+ gpr_slice_from_static_string("Buffers full"));
+ } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG,
+ "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
+ " streams",
+ t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
+ }
+ t->benign_reclaimer_registered = false;
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_resource_user_finish_reclamation(
+ exec_ctx, grpc_endpoint_get_resource_user(t->ep));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer");
+}
+
+static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
+ t->destructive_reclaimer_registered = false;
+ if (error == GRPC_ERROR_NONE && n > 0) {
+ grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map);
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string,
+ s->id);
+ }
+ grpc_chttp2_cancel_stream(
+ exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
+ GRPC_ERROR_INT_HTTP2_ERROR,
+ GRPC_CHTTP2_ENHANCE_YOUR_CALM));
+ if (n > 1) {
+ /* Since we cancel one stream per destructive reclamation, if
+ there are more streams left, we can immediately post a new
+ reclaimer in case the resource quota needs to free more
+ memory */
+ post_destructive_reclaimer(exec_ctx, t);
+ }
+ }
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_resource_user_finish_reclamation(
+ exec_ctx, grpc_endpoint_get_resource_user(t->ep));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");
+}
+
+/*******************************************************************************
* TRACING
*/
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 01f521c360..e0c4a1e925 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -138,6 +138,12 @@ typedef enum {
GRPC_NUM_SETTING_SETS
} grpc_chttp2_setting_set;
+typedef enum {
+ GRPC_CHTTP2_NO_GOAWAY_SEND,
+ GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
+ GRPC_CHTTP2_GOAWAY_SENT,
+} grpc_chttp2_sent_goaway_state;
+
/* Outstanding ping request data */
typedef struct grpc_chttp2_outstanding_ping {
uint8_t id[8];
@@ -249,7 +255,7 @@ struct grpc_chttp2_transport {
/** have we seen a goaway */
uint8_t seen_goaway;
/** have we sent a goaway */
- uint8_t sent_goaway;
+ grpc_chttp2_sent_goaway_state sent_goaway_state;
/** are the local settings dirty and need to be sent? */
uint8_t dirtied_local_settings;
@@ -320,6 +326,18 @@ struct grpc_chttp2_transport {
/* if non-NULL, close the transport with this error when writes are finished
*/
grpc_error *close_transport_on_writes_finished;
+
+ /* buffer pool state */
+ /** have we scheduled a benign cleanup? */
+ bool benign_reclaimer_registered;
+ /** have we scheduled a destructive cleanup? */
+ bool destructive_reclaimer_registered;
+ /** benign cleanup closure */
+ grpc_closure benign_reclaimer;
+ grpc_closure benign_reclaimer_locked;
+ /** destructive cleanup closure */
+ grpc_closure destructive_reclaimer;
+ grpc_closure destructive_reclaimer_locked;
};
typedef enum {
diff --git a/src/core/ext/transport/chttp2/transport/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c
index 59b3a14e0a..5f5a28446d 100644
--- a/src/core/ext/transport/chttp2/transport/stream_map.c
+++ b/src/core/ext/transport/chttp2/transport/stream_map.c
@@ -151,6 +151,17 @@ size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map) {
return map->count - map->free;
}
+void *grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map *map) {
+ if (map->count == map->free) {
+ return NULL;
+ }
+ if (map->free != 0) {
+ map->count = compact(map->keys, map->values, map->count);
+ map->free = 0;
+ }
+ return map->values[((size_t)rand()) % map->count];
+}
+
void grpc_chttp2_stream_map_for_each(grpc_chttp2_stream_map *map,
void (*f)(void *user_data, uint32_t key,
void *value),
diff --git a/src/core/ext/transport/chttp2/transport/stream_map.h b/src/core/ext/transport/chttp2/transport/stream_map.h
index e76312dd1a..203f640680 100644
--- a/src/core/ext/transport/chttp2/transport/stream_map.h
+++ b/src/core/ext/transport/chttp2/transport/stream_map.h
@@ -68,6 +68,9 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, uint32_t key);
/* Return an existing key, or NULL if it does not exist */
void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, uint32_t key);
+/* Return a random entry */
+void *grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map *map);
+
/* How many (populated) entries are in the stream map? */
size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map);