diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-06-16 17:06:31 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-06-16 17:06:31 -0700 |
commit | 31123db74e37d75eeb12b35a2f07e5655914158b (patch) | |
tree | e600cac3a262c574384a4ee845c3920caba7ca6c /src | |
parent | f73fcd1cb9f757e78c7b76205188a0f36b923cf0 (diff) |
Implement more missing pieces
Diffstat (limited to 'src')
-rw-r--r-- | src/core/transport/chttp2/incoming_metadata.c | 36 | ||||
-rw-r--r-- | src/core/transport/chttp2/incoming_metadata.h | 5 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 1 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 10 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 1 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 19 |
6 files changed, 69 insertions, 3 deletions
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index 5f32947df6..87b0a23795 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -87,6 +87,42 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_ grpc_sopb_add_metadata(sopb, b); } +void grpc_chttp2_incoming_metadata_buffer_swap(grpc_chttp2_incoming_metadata_buffer *a, grpc_chttp2_incoming_metadata_buffer *b) { + GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b); +} + +void grpc_incoming_metadata_buffer_move_to_referencing_sopb( + grpc_chttp2_incoming_metadata_buffer *src, + grpc_chttp2_incoming_metadata_buffer *dst, + grpc_stream_op_buffer *sopb) { + size_t delta; + size_t i; + if (gpr_time_cmp(dst->deadline, gpr_inf_future) == 0) { + dst->deadline = src->deadline; + } else if (gpr_time_cmp(src->deadline, gpr_inf_future) != 0) { + dst->deadline = gpr_time_min(src->deadline, dst->deadline); + } + + if (src->count == 0) { + return; + } + if (dst->count == 0) { + grpc_chttp2_incoming_metadata_buffer_swap(src, dst); + return; + } + delta = dst->count; + if (dst->capacity < src->count + dst->count) { + dst->capacity = GPR_MAX(dst->capacity * 2, src->count + dst->count); + dst->elems = gpr_realloc(dst->elems, dst->capacity * sizeof(*dst->elems)); + } + memcpy(dst->elems + dst->count, src->elems, src->count * sizeof(*src->elems)); + dst->count += src->count; + for (i = 0; i < sopb->nops; i++) { + if (sopb->ops[i].type != GRPC_OP_METADATA) continue; + sopb->ops[i].data.metadata.list.tail = (void*)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail); + } +} + void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer) { diff --git a/src/core/transport/chttp2/incoming_metadata.h b/src/core/transport/chttp2/incoming_metadata.h index 5a7890a534..bc7e3816bc 100644 --- a/src/core/transport/chttp2/incoming_metadata.h +++ b/src/core/transport/chttp2/incoming_metadata.h @@ -66,6 +66,11 @@ void grpc_chttp2_incoming_metadata_buffer_set_deadline( void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb); +void grpc_incoming_metadata_buffer_move_to_referencing_sopb( + grpc_chttp2_incoming_metadata_buffer *src, + grpc_chttp2_incoming_metadata_buffer *dst, + grpc_stream_op_buffer *sopb); + void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 8705571806..1acf0a4b91 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -186,6 +186,7 @@ void grpc_chttp2_publish_reads( /* publish incoming stream ops */ if (stream_parsing->data_parser.incoming_sopb.nops > 0) { + grpc_incoming_metadata_buffer_move_to_referencing_sopb(&stream_parsing->incoming_metadata, &stream_global->incoming_metadata, &stream_parsing->data_parser.incoming_sopb); grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb, &stream_global->incoming_sopb); grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index dfead28e76..24d8737ceb 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -188,6 +188,7 @@ int grpc_chttp2_list_pop_written_stream( grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITTEN); + *stream_global = &stream->global; *stream_writing = &stream->writing; return r; } @@ -296,6 +297,15 @@ void grpc_chttp2_list_add_read_write_state_changed( GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); } +int grpc_chttp2_list_pop_read_write_state_changed( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global) { + grpc_chttp2_stream *stream; + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); + *stream_global = &stream->global; + return r; +} + void grpc_chttp2_list_add_incoming_window_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 291c088d75..e83ec632b5 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -190,6 +190,7 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { + gpr_log(GPR_DEBUG, "sc:%d ws:%d", (int)stream_writing->send_closed, stream_global->write_state); if (stream_writing->send_closed != DONT_SEND_CLOSED) { stream_global->write_state = WRITE_STATE_SENT_CLOSE; if (!transport_global->is_client) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 181a6b8c58..c4838cceb8 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -365,6 +365,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, *t->accepting_stream = s; grpc_chttp2_list_add_incoming_window_updated(&t->global, &s->global); grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s); + s->global.in_stream_map = 1; } if (initial_op) perform_op_locked(&t->global, &s->global, initial_op); @@ -561,6 +562,7 @@ static void maybe_start_some_streams( grpc_chttp2_stream_map_add( &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global)); + stream_global->in_stream_map = 1; transport_global->concurrent_stream_count++; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); @@ -612,6 +614,7 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, stream_global->recv_done_closure = op->on_done_recv; stream_global->publish_sopb = op->recv_ops; stream_global->publish_sopb->nops = 0; + stream_global->publish_state = op->recv_state; grpc_chttp2_incoming_metadata_live_op_buffer_end( &stream_global->outstanding_metadata); grpc_chttp2_list_add_read_write_state_changed(transport_global, @@ -708,14 +711,24 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, static void unlock_check_reads(grpc_chttp2_transport *t) { grpc_chttp2_stream_global *stream_global; + grpc_stream_state state; - while (grpc_chttp2_pop_read_write_state_changed(&t->global, &stream_global)) { + while (grpc_chttp2_list_pop_read_write_state_changed(&t->global, &stream_global)) { if (!stream_global->publish_sopb) { continue; } + state = compute_state(stream_global->write_state == WRITE_STATE_SENT_CLOSE, stream_global->read_closed && !stream_global->in_stream_map); + gpr_log(GPR_DEBUG, "ws:%d rc:%d ism:%d => st:%d", stream_global->write_state, stream_global->read_closed, stream_global->in_stream_map, state); + if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) { + continue; + } + grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(&stream_global->incoming_metadata, &stream_global->incoming_sopb, &stream_global->outstanding_metadata); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); - /* TODO(ctiller): we need to not publish closed until !writing, or define a new STREAM_DELETABLE state */ - stream_global->published_state = *stream_global->publish_state = compute_state(stream_global->write_closed, stream_global->read_closed && !stream_global->in_stream_map); + stream_global->published_state = *stream_global->publish_state = state; + grpc_chttp2_schedule_closure(&t->global, stream_global->recv_done_closure, 1); + stream_global->recv_done_closure = NULL; + stream_global->publish_sopb = NULL; + stream_global->publish_state = NULL; } } |