aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-16 17:06:31 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-16 17:06:31 -0700
commit31123db74e37d75eeb12b35a2f07e5655914158b (patch)
treee600cac3a262c574384a4ee845c3920caba7ca6c
parentf73fcd1cb9f757e78c7b76205188a0f36b923cf0 (diff)
Implement more missing pieces
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c36
-rw-r--r--src/core/transport/chttp2/incoming_metadata.h5
-rw-r--r--src/core/transport/chttp2/parsing.c1
-rw-r--r--src/core/transport/chttp2/stream_lists.c10
-rw-r--r--src/core/transport/chttp2/writing.c1
-rw-r--r--src/core/transport/chttp2_transport.c19
6 files changed, 69 insertions, 3 deletions
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index 5f32947df6..87b0a23795 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -87,6 +87,42 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_
grpc_sopb_add_metadata(sopb, b);
}
+void grpc_chttp2_incoming_metadata_buffer_swap(grpc_chttp2_incoming_metadata_buffer *a, grpc_chttp2_incoming_metadata_buffer *b) {
+ GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b);
+}
+
+void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
+ grpc_chttp2_incoming_metadata_buffer *src,
+ grpc_chttp2_incoming_metadata_buffer *dst,
+ grpc_stream_op_buffer *sopb) {
+ size_t delta;
+ size_t i;
+ if (gpr_time_cmp(dst->deadline, gpr_inf_future) == 0) {
+ dst->deadline = src->deadline;
+ } else if (gpr_time_cmp(src->deadline, gpr_inf_future) != 0) {
+ dst->deadline = gpr_time_min(src->deadline, dst->deadline);
+ }
+
+ if (src->count == 0) {
+ return;
+ }
+ if (dst->count == 0) {
+ grpc_chttp2_incoming_metadata_buffer_swap(src, dst);
+ return;
+ }
+ delta = dst->count;
+ if (dst->capacity < src->count + dst->count) {
+ dst->capacity = GPR_MAX(dst->capacity * 2, src->count + dst->count);
+ dst->elems = gpr_realloc(dst->elems, dst->capacity * sizeof(*dst->elems));
+ }
+ memcpy(dst->elems + dst->count, src->elems, src->count * sizeof(*src->elems));
+ dst->count += src->count;
+ for (i = 0; i < sopb->nops; i++) {
+ if (sopb->ops[i].type != GRPC_OP_METADATA) continue;
+ sopb->ops[i].data.metadata.list.tail = (void*)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail);
+ }
+}
+
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,
grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer) {
diff --git a/src/core/transport/chttp2/incoming_metadata.h b/src/core/transport/chttp2/incoming_metadata.h
index 5a7890a534..bc7e3816bc 100644
--- a/src/core/transport/chttp2/incoming_metadata.h
+++ b/src/core/transport/chttp2/incoming_metadata.h
@@ -66,6 +66,11 @@ void grpc_chttp2_incoming_metadata_buffer_set_deadline(
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb);
+void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
+ grpc_chttp2_incoming_metadata_buffer *src,
+ grpc_chttp2_incoming_metadata_buffer *dst,
+ grpc_stream_op_buffer *sopb);
+
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,
grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 8705571806..1acf0a4b91 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -186,6 +186,7 @@ void grpc_chttp2_publish_reads(
/* publish incoming stream ops */
if (stream_parsing->data_parser.incoming_sopb.nops > 0) {
+ grpc_incoming_metadata_buffer_move_to_referencing_sopb(&stream_parsing->incoming_metadata, &stream_global->incoming_metadata, &stream_parsing->data_parser.incoming_sopb);
grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb, &stream_global->incoming_sopb);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index dfead28e76..24d8737ceb 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -188,6 +188,7 @@ int grpc_chttp2_list_pop_written_stream(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream,
GRPC_CHTTP2_LIST_WRITTEN);
+ *stream_global = &stream->global;
*stream_writing = &stream->writing;
return r;
}
@@ -296,6 +297,15 @@ void grpc_chttp2_list_add_read_write_state_changed(
GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
}
+int grpc_chttp2_list_pop_read_write_state_changed(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global) {
+ grpc_chttp2_stream *stream;
+ int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
+ *stream_global = &stream->global;
+ return r;
+}
+
void grpc_chttp2_list_add_incoming_window_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 291c088d75..e83ec632b5 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -190,6 +190,7 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
+ gpr_log(GPR_DEBUG, "sc:%d ws:%d", (int)stream_writing->send_closed, stream_global->write_state);
if (stream_writing->send_closed != DONT_SEND_CLOSED) {
stream_global->write_state = WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 181a6b8c58..c4838cceb8 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -365,6 +365,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
*t->accepting_stream = s;
grpc_chttp2_list_add_incoming_window_updated(&t->global, &s->global);
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
+ s->global.in_stream_map = 1;
}
if (initial_op) perform_op_locked(&t->global, &s->global, initial_op);
@@ -561,6 +562,7 @@ static void maybe_start_some_streams(
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
+ stream_global->in_stream_map = 1;
transport_global->concurrent_stream_count++;
grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
@@ -612,6 +614,7 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
stream_global->recv_done_closure = op->on_done_recv;
stream_global->publish_sopb = op->recv_ops;
stream_global->publish_sopb->nops = 0;
+ stream_global->publish_state = op->recv_state;
grpc_chttp2_incoming_metadata_live_op_buffer_end(
&stream_global->outstanding_metadata);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
@@ -708,14 +711,24 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
static void unlock_check_reads(grpc_chttp2_transport *t) {
grpc_chttp2_stream_global *stream_global;
+ grpc_stream_state state;
- while (grpc_chttp2_pop_read_write_state_changed(&t->global, &stream_global)) {
+ while (grpc_chttp2_list_pop_read_write_state_changed(&t->global, &stream_global)) {
if (!stream_global->publish_sopb) {
continue;
}
+ state = compute_state(stream_global->write_state == WRITE_STATE_SENT_CLOSE, stream_global->read_closed && !stream_global->in_stream_map);
+ gpr_log(GPR_DEBUG, "ws:%d rc:%d ism:%d => st:%d", stream_global->write_state, stream_global->read_closed, stream_global->in_stream_map, state);
+ if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) {
+ continue;
+ }
+ grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(&stream_global->incoming_metadata, &stream_global->incoming_sopb, &stream_global->outstanding_metadata);
grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
- /* TODO(ctiller): we need to not publish closed until !writing, or define a new STREAM_DELETABLE state */
- stream_global->published_state = *stream_global->publish_state = compute_state(stream_global->write_closed, stream_global->read_closed && !stream_global->in_stream_map);
+ stream_global->published_state = *stream_global->publish_state = state;
+ grpc_chttp2_schedule_closure(&t->global, stream_global->recv_done_closure, 1);
+ stream_global->recv_done_closure = NULL;
+ stream_global->publish_sopb = NULL;
+ stream_global->publish_state = NULL;
}
}