diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/surface/call.c | 8 | ||||
-rw-r--r-- | src/core/transport/byte_stream.c | 6 | ||||
-rw-r--r-- | src/core/transport/byte_stream.h | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 24 |
5 files changed, 23 insertions, 21 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 84b9daaa28..79295ae0ff 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -367,7 +367,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) { &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); } if (c->receiving_stream != NULL) { - grpc_byte_stream_destroy(c->receiving_stream); + grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); @@ -951,7 +951,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, (*call->receiving_buffer)->data.raw.slice_buffer.length; if (remaining == 0) { call->receiving_message = 0; - grpc_byte_stream_destroy(call->receiving_stream); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); @@ -1067,7 +1067,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_channel_get_max_message_length(call->channel)) { cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, "Max message size exceeded"); - grpc_byte_stream_destroy(call->receiving_stream); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; *call->receiving_buffer = NULL; if (gpr_unref(&bctl->steps_to_complete)) { @@ -1356,7 +1356,7 @@ done_with_error: } if (bctl->send_message) { call->sending_message = 0; - grpc_byte_stream_destroy(&call->sending_stream.base); + grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base); } if (bctl->send_final_op) { call->sent_final_op = 0; diff --git a/src/core/transport/byte_stream.c b/src/core/transport/byte_stream.c index 81e8e77ccb..4e8f589623 100644 --- a/src/core/transport/byte_stream.c +++ b/src/core/transport/byte_stream.c @@ -44,8 +44,8 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, on_complete); } -void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream) { - byte_stream->destroy(byte_stream); +void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { + byte_stream->destroy(exec_ctx, byte_stream); } /* slice_buffer_stream */ @@ -61,7 +61,7 @@ static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, return 1; } -static void slice_buffer_stream_destroy(grpc_byte_stream *byte_stream) {} +static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) {} void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, gpr_slice_buffer *slice_buffer, diff --git a/src/core/transport/byte_stream.h b/src/core/transport/byte_stream.h index c94d8ff275..558147f6f0 100644 --- a/src/core/transport/byte_stream.h +++ b/src/core/transport/byte_stream.h @@ -52,7 +52,7 @@ struct grpc_byte_stream { int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, grpc_closure *on_complete); - void (*destroy)(grpc_byte_stream *byte_stream); + void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); }; /* returns 1 if the bytes are available immediately (in which case @@ -72,7 +72,7 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, grpc_closure *on_complete); -void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream); +void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); /* grpc_byte_stream that wraps a slice buffer */ typedef struct grpc_slice_buffer_stream { diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 38fa990758..08cc760aba 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -57,7 +57,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, } while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { - grpc_byte_stream_destroy(bs); + grpc_byte_stream_destroy(exec_ctx, bs); } } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 3e88f69abf..cfaa96772c 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -529,7 +529,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { - grpc_byte_stream_destroy(bs); + grpc_byte_stream_destroy(exec_ctx, bs); } GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); @@ -992,7 +992,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, while (stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - grpc_byte_stream_destroy(bs); + grpc_byte_stream_destroy(exec_ctx, bs); } if (stream_global->incoming_frames.head == NULL) { grpc_chttp2_incoming_metadata_buffer_publish( @@ -1474,8 +1474,17 @@ static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { } } -static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) { - incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream); +static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { + grpc_chttp2_incoming_byte_stream *incoming_byte_stream = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + if (incoming_byte_stream->base.length == 0 && incoming_byte_stream->is_tail) { + lock(incoming_byte_stream->transport); + incoming_byte_stream_update_flow_control( + &incoming_byte_stream->transport->global, + &incoming_byte_stream->stream->global, 0, 0); + unlock(exec_ctx, incoming_byte_stream->transport); + } + incoming_byte_stream_unref(incoming_byte_stream); } void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, @@ -1521,13 +1530,6 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( add_to_queue->tail->next_message = incoming_byte_stream; } add_to_queue->tail = incoming_byte_stream; - if (frame_size == 0) { - lock(TRANSPORT_FROM_PARSING(transport_parsing)); - incoming_byte_stream_update_flow_control( - &TRANSPORT_FROM_PARSING(transport_parsing)->global, - &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0); - unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing)); - } return incoming_byte_stream; } |