From 6905b7ce0c455ea3be677fdc657804b97e9f6815 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 16 Jun 2015 15:33:33 -0700 Subject: Implement more missing pieces --- src/core/transport/stream_op.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'src/core/transport/stream_op.c') diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 8996ecac35..8b5549cd4c 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -163,6 +163,19 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, sopb->nops = new_nops; } +void grpc_sopb_move_to(grpc_stream_op_buffer *src, grpc_stream_op_buffer *dst) { + size_t i; + if (src->nops == 0) { + return; + } + if (dst->nops == 0) { + grpc_sopb_swap(src, dst); + return; + } + grpc_sopb_append(dst, src->ops, src->nops); + src->ops = 0; +} + static void assert_valid_list(grpc_mdelem_list *list) { #ifndef NDEBUG grpc_linked_mdelem *l; -- cgit v1.2.3 From f73fcd1cb9f757e78c7b76205188a0f36b923cf0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 16 Jun 2015 16:25:26 -0700 Subject: Fixing stuff --- src/core/transport/chttp2/internal.h | 3 +++ src/core/transport/chttp2/parsing.c | 5 +++++ src/core/transport/chttp2_transport.c | 34 ++++++++++++++++++++++++---------- src/core/transport/stream_op.c | 3 +-- 4 files changed, 33 insertions(+), 12 deletions(-) (limited to 'src/core/transport/stream_op.c') diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index b60811d71e..88eb790d12 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -595,6 +595,9 @@ int grpc_chttp2_list_pop_cancelled_waiting_for_parsing( void grpc_chttp2_list_add_read_write_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_read_write_state_changed( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global); void grpc_chttp2_list_add_incoming_window_state_changed( grpc_chttp2_transport_global *transport_global, diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 91b509dc9a..8705571806 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -185,6 +185,11 @@ void grpc_chttp2_publish_reads( } /* publish incoming stream ops */ + if (stream_parsing->data_parser.incoming_sopb.nops > 0) { + grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb, &stream_global->incoming_sopb); + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); + } } } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 6bb9c72035..181a6b8c58 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -80,6 +80,7 @@ static void unlock(grpc_chttp2_transport *t); static void unlock_check_cancellations(grpc_chttp2_transport *t); static void unlock_check_channel_callbacks(grpc_chttp2_transport *t); +static void unlock_check_reads(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); @@ -345,6 +346,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.incoming_metadata); grpc_chttp2_incoming_metadata_buffer_init(&s->global.incoming_metadata); grpc_sopb_init(&s->writing.sopb); + grpc_sopb_init(&s->global.incoming_sopb); grpc_chttp2_data_parser_init(&s->parsing.data_parser); ref_transport(t); @@ -446,6 +448,7 @@ static void unlock(grpc_chttp2_transport *t) { grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); } unlock_check_cancellations(t); + unlock_check_reads(t); /* unlock_check_parser(t); */ unlock_check_channel_callbacks(t); @@ -695,6 +698,27 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) { #endif } +static grpc_stream_state compute_state(gpr_uint8 write_closed, + gpr_uint8 read_closed) { + if (write_closed && read_closed) return GRPC_STREAM_CLOSED; + if (write_closed) return GRPC_STREAM_SEND_CLOSED; + if (read_closed) return GRPC_STREAM_RECV_CLOSED; + return GRPC_STREAM_OPEN; +} + +static void unlock_check_reads(grpc_chttp2_transport *t) { + grpc_chttp2_stream_global *stream_global; + + while (grpc_chttp2_pop_read_write_state_changed(&t->global, &stream_global)) { + if (!stream_global->publish_sopb) { + continue; + } + grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); + /* TODO(ctiller): we need to not publish closed until !writing, or define a new STREAM_DELETABLE state */ + stream_global->published_state = *stream_global->publish_state = compute_state(stream_global->write_closed, stream_global->read_closed && !stream_global->in_stream_map); + } +} + static void cancel_from_api(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status) { @@ -928,16 +952,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, * CALLBACK LOOP */ -#if 0 -static grpc_stream_state compute_state(gpr_uint8 write_closed, - gpr_uint8 read_closed) { - if (write_closed && read_closed) return GRPC_STREAM_CLOSED; - if (write_closed) return GRPC_STREAM_SEND_CLOSED; - if (read_closed) return GRPC_STREAM_RECV_CLOSED; - return GRPC_STREAM_OPEN; -} -#endif - typedef struct { grpc_chttp2_transport *t; gpr_uint32 error; diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 8b5549cd4c..cad17035d1 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -164,7 +164,6 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, } void grpc_sopb_move_to(grpc_stream_op_buffer *src, grpc_stream_op_buffer *dst) { - size_t i; if (src->nops == 0) { return; } @@ -173,7 +172,7 @@ void grpc_sopb_move_to(grpc_stream_op_buffer *src, grpc_stream_op_buffer *dst) { return; } grpc_sopb_append(dst, src->ops, src->nops); - src->ops = 0; + src->nops = 0; } static void assert_valid_list(grpc_mdelem_list *list) { -- cgit v1.2.3 From 159a774fdd05d22deaca47043fa913455d0da94c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 18 Jun 2015 11:51:31 -0700 Subject: Fix type in swap buffer --- src/core/transport/stream_op.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/transport/stream_op.c') diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index cad17035d1..efb33ddf68 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -65,7 +65,7 @@ void grpc_sopb_swap(grpc_stream_op_buffer *a, grpc_stream_op_buffer *b) { if (a->ops == a->inlined_ops) { if (b->ops == b->inlined_ops) { /* swap contents of inlined buffer */ - gpr_slice temp[GRPC_SOPB_INLINE_ELEMENTS]; + grpc_stream_op temp[GRPC_SOPB_INLINE_ELEMENTS]; memcpy(temp, a->ops, b->nops * sizeof(grpc_stream_op)); memcpy(a->ops, b->ops, a->nops * sizeof(grpc_stream_op)); memcpy(b->ops, temp, b->nops * sizeof(grpc_stream_op)); -- cgit v1.2.3