aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c43
1 files changed, 31 insertions, 12 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index a1ca78d4c4..369ff0ad7f 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -951,12 +951,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
unlock(exec_ctx, t);
}
-static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_transport_op *op) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- int close_transport = 0;
-
- lock(t);
+static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_transport_op *op) {
+ bool close_transport = false;
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
@@ -975,8 +973,8 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
close_transport = !grpc_chttp2_has_streams(t);
}
- if (op->set_accept_stream != NULL) {
- t->channel_callback.accept_stream = op->set_accept_stream;
+ if (op->set_accept_stream) {
+ t->channel_callback.accept_stream = op->set_accept_stream_fn;
t->channel_callback.accept_stream_user_data =
op->set_accept_stream_user_data;
}
@@ -997,15 +995,29 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
close_transport_locked(exec_ctx, t);
}
- unlock(exec_ctx, t);
-
if (close_transport) {
- lock(t);
close_transport_locked(exec_ctx, t);
- unlock(exec_ctx, t);
}
}
+static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_transport_op *op) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+
+ lock(t);
+
+ /* Let's be overly cautious: don't change any state while we're parsing */
+ if (t->parsing_active) {
+ GPR_ASSERT(t->post_parsing_op == NULL);
+ t->post_parsing_op = gpr_malloc(sizeof(*op));
+ memcpy(t->post_parsing_op, op, sizeof(*op));
+ } else {
+ perform_transport_op_locked(exec_ctx, t, op);
+ }
+
+ unlock(exec_ctx, t);
+}
+
/*******************************************************************************
* INPUT PROCESSING
*/
@@ -1401,6 +1413,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
/* handle higher level things */
grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
t->parsing_active = 0;
+ /* handle delayed transport ops (if there is one) */
+ if (t->post_parsing_op) {
+ grpc_transport_op *op = t->post_parsing_op;
+ t->post_parsing_op = NULL;
+ perform_transport_op_locked(exec_ctx, t, op);
+ gpr_free(op);
+ }
/* if a stream is in the stream map, and gets cancelled, we need to ensure
* we are not parsing before continuing the cancellation to keep things in
* a sane state */