aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-12-18 11:40:49 -0800
committerGravatar Yang Gao <yangg@google.com>2015-12-18 11:40:49 -0800
commit11169d9a2c8301f9d8205ab8a2fb7582530156bc (patch)
tree876e671713caac129ffff1b5ab2967d2527ccca8 /src
parentf5be46d060fae3330408c1af7db236bd9d7f7e82 (diff)
parenta3f298ff67e8ee3ada117de136ea2469a146dc5e (diff)
Merge pull request #4384 from ctiller/big_data
Add a test that overflows incoming flow control windows
Diffstat (limited to 'src')
-rw-r--r--src/core/security/server_auth_filter.c2
-rw-r--r--src/core/surface/call.c14
-rw-r--r--src/core/transport/byte_stream.c8
-rw-r--r--src/core/transport/byte_stream.h5
-rw-r--r--src/core/transport/chttp2/frame_data.c2
-rw-r--r--src/core/transport/chttp2_transport.c33
6 files changed, 37 insertions, 27 deletions
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 5cfee6d139..d5c8c54369 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -140,7 +140,7 @@ static void on_md_processing_done(
message = gpr_slice_from_copied_string(error_details);
calld->transport_op.send_initial_metadata = NULL;
if (calld->transport_op.send_message != NULL) {
- grpc_byte_stream_destroy(calld->transport_op.send_message);
+ grpc_byte_stream_destroy(&exec_ctx, calld->transport_op.send_message);
calld->transport_op.send_message = NULL;
}
calld->transport_op.send_trailing_metadata = NULL;
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index f8dde0748b..73c1996908 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -360,7 +360,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) {
&c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
}
if (c->receiving_stream != NULL) {
- grpc_byte_stream_destroy(c->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
@@ -951,7 +951,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
(*call->receiving_buffer)->data.raw.slice_buffer.length;
if (remaining == 0) {
call->receiving_message = 0;
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
@@ -979,7 +979,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
call->receiving_slice);
continue_receiving_slices(exec_ctx, bctl);
} else {
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL;
@@ -1068,7 +1068,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
if (call->receiving_stream == NULL) {
*call->receiving_buffer = NULL;
- call->receiving_message = 0;
+ call->receiving_message = 0;
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
}
@@ -1076,10 +1076,10 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_channel_get_max_message_length(call->channel)) {
cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
"Max message size exceeded");
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
*call->receiving_buffer = NULL;
- call->receiving_message = 0;
+ call->receiving_message = 0;
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
}
@@ -1367,7 +1367,7 @@ done_with_error:
}
if (bctl->send_message) {
call->sending_message = 0;
- grpc_byte_stream_destroy(&call->sending_stream.base);
+ grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
}
if (bctl->send_final_op) {
call->sent_final_op = 0;
diff --git a/src/core/transport/byte_stream.c b/src/core/transport/byte_stream.c
index 81e8e77ccb..89e20489e7 100644
--- a/src/core/transport/byte_stream.c
+++ b/src/core/transport/byte_stream.c
@@ -44,8 +44,9 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
on_complete);
}
-void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream) {
- byte_stream->destroy(byte_stream);
+void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream) {
+ byte_stream->destroy(exec_ctx, byte_stream);
}
/* slice_buffer_stream */
@@ -61,7 +62,8 @@ static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
return 1;
}
-static void slice_buffer_stream_destroy(grpc_byte_stream *byte_stream) {}
+static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream) {}
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
gpr_slice_buffer *slice_buffer,
diff --git a/src/core/transport/byte_stream.h b/src/core/transport/byte_stream.h
index c94d8ff275..5f2fe573e8 100644
--- a/src/core/transport/byte_stream.h
+++ b/src/core/transport/byte_stream.h
@@ -52,7 +52,7 @@ struct grpc_byte_stream {
int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
grpc_closure *on_complete);
- void (*destroy)(grpc_byte_stream *byte_stream);
+ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
};
/* returns 1 if the bytes are available immediately (in which case
@@ -72,7 +72,8 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, gpr_slice *slice,
size_t max_size_hint, grpc_closure *on_complete);
-void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream);
+void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream);
/* grpc_byte_stream that wraps a slice buffer */
typedef struct grpc_slice_buffer_stream {
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 60a3ce23d5..5b16ce6334 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -58,7 +58,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
}
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
- grpc_byte_stream_destroy(bs);
+ grpc_byte_stream_destroy(exec_ctx, bs);
}
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 4561c0bfa9..70f7eed4fe 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -134,7 +134,12 @@ static void connectivity_state_set(
static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global);
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
+static void incoming_byte_stream_update_flow_control(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
+ size_t have_already);
+
+static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global);
/*
@@ -532,7 +537,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
- grpc_byte_stream_destroy(bs);
+ grpc_byte_stream_destroy(exec_ctx, bs);
}
GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
@@ -642,7 +647,8 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) {
+ while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
+ &stream_global)) {
fail_pending_writes(exec_ctx, stream_global);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
}
@@ -867,6 +873,13 @@ static void perform_stream_op_locked(
GPR_ASSERT(stream_global->recv_message_ready == NULL);
stream_global->recv_message_ready = op->recv_message_ready;
stream_global->recv_message = op->recv_message;
+ if (stream_global->id != 0 &&
+ (stream_global->incoming_frames.head == NULL ||
+ stream_global->incoming_frames.head->is_tail)) {
+ incoming_byte_stream_update_flow_control(
+ transport_global, stream_global, transport_global->stream_lookahead,
+ 0);
+ }
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
@@ -1021,7 +1034,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
while (stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- grpc_byte_stream_destroy(bs);
+ grpc_byte_stream_destroy(exec_ctx, bs);
}
if (stream_global->incoming_frames.head == NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1122,7 +1135,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
}
}
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
+static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_initial_metadata_finished, 0);
@@ -1528,7 +1541,8 @@ static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) {
}
}
-static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) {
+static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream) {
incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream);
}
@@ -1598,13 +1612,6 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
add_to_queue->tail->next_message = incoming_byte_stream;
}
add_to_queue->tail = incoming_byte_stream;
- if (frame_size == 0) {
- lock(TRANSPORT_FROM_PARSING(transport_parsing));
- incoming_byte_stream_update_flow_control(
- &TRANSPORT_FROM_PARSING(transport_parsing)->global,
- &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0);
- unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing));
- }
return incoming_byte_stream;
}