aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/call.c8
-rw-r--r--src/core/transport/byte_stream.c6
-rw-r--r--src/core/transport/byte_stream.h4
-rw-r--r--src/core/transport/chttp2/frame_data.c2
-rw-r--r--src/core/transport/chttp2_transport.c24
5 files changed, 23 insertions, 21 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 84b9daaa28..79295ae0ff 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -367,7 +367,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) {
&c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
}
if (c->receiving_stream != NULL) {
- grpc_byte_stream_destroy(c->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
@@ -951,7 +951,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
(*call->receiving_buffer)->data.raw.slice_buffer.length;
if (remaining == 0) {
call->receiving_message = 0;
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
@@ -1067,7 +1067,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_channel_get_max_message_length(call->channel)) {
cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
"Max message size exceeded");
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
*call->receiving_buffer = NULL;
if (gpr_unref(&bctl->steps_to_complete)) {
@@ -1356,7 +1356,7 @@ done_with_error:
}
if (bctl->send_message) {
call->sending_message = 0;
- grpc_byte_stream_destroy(&call->sending_stream.base);
+ grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
}
if (bctl->send_final_op) {
call->sent_final_op = 0;
diff --git a/src/core/transport/byte_stream.c b/src/core/transport/byte_stream.c
index 81e8e77ccb..4e8f589623 100644
--- a/src/core/transport/byte_stream.c
+++ b/src/core/transport/byte_stream.c
@@ -44,8 +44,8 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
on_complete);
}
-void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream) {
- byte_stream->destroy(byte_stream);
+void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) {
+ byte_stream->destroy(exec_ctx, byte_stream);
}
/* slice_buffer_stream */
@@ -61,7 +61,7 @@ static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
return 1;
}
-static void slice_buffer_stream_destroy(grpc_byte_stream *byte_stream) {}
+static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) {}
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
gpr_slice_buffer *slice_buffer,
diff --git a/src/core/transport/byte_stream.h b/src/core/transport/byte_stream.h
index c94d8ff275..558147f6f0 100644
--- a/src/core/transport/byte_stream.h
+++ b/src/core/transport/byte_stream.h
@@ -52,7 +52,7 @@ struct grpc_byte_stream {
int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
grpc_closure *on_complete);
- void (*destroy)(grpc_byte_stream *byte_stream);
+ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
};
/* returns 1 if the bytes are available immediately (in which case
@@ -72,7 +72,7 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, gpr_slice *slice,
size_t max_size_hint, grpc_closure *on_complete);
-void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream);
+void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
/* grpc_byte_stream that wraps a slice buffer */
typedef struct grpc_slice_buffer_stream {
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 38fa990758..08cc760aba 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -57,7 +57,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
}
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
- grpc_byte_stream_destroy(bs);
+ grpc_byte_stream_destroy(exec_ctx, bs);
}
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 3e88f69abf..cfaa96772c 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -529,7 +529,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
- grpc_byte_stream_destroy(bs);
+ grpc_byte_stream_destroy(exec_ctx, bs);
}
GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
@@ -992,7 +992,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
while (stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- grpc_byte_stream_destroy(bs);
+ grpc_byte_stream_destroy(exec_ctx, bs);
}
if (stream_global->incoming_frames.head == NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1474,8 +1474,17 @@ static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) {
}
}
-static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) {
- incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream);
+static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) {
+ grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ if (incoming_byte_stream->base.length == 0 && incoming_byte_stream->is_tail) {
+ lock(incoming_byte_stream->transport);
+ incoming_byte_stream_update_flow_control(
+ &incoming_byte_stream->transport->global,
+ &incoming_byte_stream->stream->global, 0, 0);
+ unlock(exec_ctx, incoming_byte_stream->transport);
+ }
+ incoming_byte_stream_unref(incoming_byte_stream);
}
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
@@ -1521,13 +1530,6 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
add_to_queue->tail->next_message = incoming_byte_stream;
}
add_to_queue->tail = incoming_byte_stream;
- if (frame_size == 0) {
- lock(TRANSPORT_FROM_PARSING(transport_parsing));
- incoming_byte_stream_update_flow_control(
- &TRANSPORT_FROM_PARSING(transport_parsing)->global,
- &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0);
- unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing));
- }
return incoming_byte_stream;
}