aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c95
1 files changed, 66 insertions, 29 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index b9f511e946..19265252ca 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -142,7 +142,7 @@ static void incoming_byte_stream_update_flow_control(
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global);
-/*
+/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -432,6 +432,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
if (t->ep) {
allow_endpoint_shutdown_locked(exec_ctx, t);
}
+
+ /* flush writable stream list to avoid dangling references */
+ grpc_chttp2_stream_global *stream_global;
+ grpc_chttp2_stream_writing *stream_writing;
+ while (grpc_chttp2_list_pop_writable_stream(
+ &t->global, &t->writing, &stream_global, &stream_writing)) {
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
+ }
}
}
@@ -521,7 +529,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.id) == NULL);
}
- grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
&s->global);
grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
@@ -583,7 +590,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
return &accepting->parsing;
}
-/*
+/*******************************************************************************
* LOCK MANAGEMENT
*/
@@ -611,10 +618,18 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_END("unlock", 0);
}
-/*
+/*******************************************************************************
* OUTPUT PROCESSING
*/
+void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed &&
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ }
+}
+
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
@@ -732,7 +747,7 @@ static void maybe_start_some_streams(
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = 1;
transport_global->concurrent_stream_count++;
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -821,7 +836,7 @@ static void perform_stream_op_locked(
maybe_start_some_streams(exec_ctx, transport_global);
} else {
GPR_ASSERT(stream_global->id != 0);
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
} else {
grpc_chttp2_complete_closure_step(
@@ -838,7 +853,7 @@ static void perform_stream_op_locked(
exec_ctx, &stream_global->send_message_finished, 0);
} else if (stream_global->id != 0) {
stream_global->send_message = op->send_message;
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -858,7 +873,7 @@ static void perform_stream_op_locked(
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -944,12 +959,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
unlock(exec_ctx, t);
}
-static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_transport_op *op) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- int close_transport = 0;
-
- lock(t);
+static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_transport_op *op) {
+ bool close_transport = false;
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
@@ -968,8 +981,8 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
close_transport = !grpc_chttp2_has_streams(t);
}
- if (op->set_accept_stream != NULL) {
- t->channel_callback.accept_stream = op->set_accept_stream;
+ if (op->set_accept_stream) {
+ t->channel_callback.accept_stream = op->set_accept_stream_fn;
t->channel_callback.accept_stream_user_data =
op->set_accept_stream_user_data;
}
@@ -990,16 +1003,31 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
close_transport_locked(exec_ctx, t);
}
- unlock(exec_ctx, t);
-
if (close_transport) {
- lock(t);
close_transport_locked(exec_ctx, t);
- unlock(exec_ctx, t);
}
}
-/*
+static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_transport_op *op) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+
+ lock(t);
+
+ /* If there's a set_accept_stream ensure that we're not parsing
+ to avoid changing things out from underneath */
+ if (t->parsing_active && op->set_accept_stream) {
+ GPR_ASSERT(t->post_parsing_op == NULL);
+ t->post_parsing_op = gpr_malloc(sizeof(*op));
+ memcpy(t->post_parsing_op, op, sizeof(*op));
+ } else {
+ perform_transport_op_locked(exec_ctx, t, op);
+ }
+
+ unlock(exec_ctx, t);
+}
+
+/*******************************************************************************
* INPUT PROCESSING
*/
@@ -1064,7 +1092,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (!s) {
s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
}
- grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
GPR_ASSERT(s);
s->global.in_stream_map = 0;
if (t->parsing.incoming_stream == &s->parsing) {
@@ -1080,6 +1107,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(exec_ctx, t);
}
+ if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
+ }
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map);
@@ -1331,7 +1361,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -1392,6 +1422,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
/* handle higher level things */
grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
t->parsing_active = 0;
+ /* handle delayed transport ops (if there is one) */
+ if (t->post_parsing_op) {
+ grpc_transport_op *op = t->post_parsing_op;
+ t->post_parsing_op = NULL;
+ perform_transport_op_locked(exec_ctx, t, op);
+ gpr_free(op);
+ }
/* if a stream is in the stream map, and gets cancelled, we need to ensure
* we are not parsing before continuing the cancellation to keep things in
* a sane state */
@@ -1426,7 +1463,7 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
GPR_TIMER_END("recv_data", 0);
}
-/*
+/*******************************************************************************
* CALLBACK LOOP
*/
@@ -1440,7 +1477,7 @@ static void connectivity_state_set(
state, reason);
}
-/*
+/*******************************************************************************
* POLLSET STUFF
*/
@@ -1468,7 +1505,7 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
unlock(exec_ctx, t);
}
-/*
+/*******************************************************************************
* BYTE STREAM
*/
@@ -1508,7 +1545,7 @@ static void incoming_byte_stream_update_flow_control(
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -1623,7 +1660,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
return incoming_byte_stream;
}
-/*
+/*******************************************************************************
* TRACING
*/
@@ -1709,7 +1746,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
gpr_free(prefix);
}
-/*
+/*******************************************************************************
* INTEGRATION GLUE
*/