diff options
author | Yang Gao <yangg@google.com> | 2015-08-05 11:47:30 -0700 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-08-05 11:47:30 -0700 |
commit | d2e9cb3b92623c059112668517593d096f28333f (patch) | |
tree | f36378df6efadab4046ac4a0adfd26adb30efdd1 | |
parent | 9f007f059764133a49fdf7210ce9881bc38e9c05 (diff) | |
parent | 220adcc745a9c60cf3bab00f020b2fce8c3b7c9d (diff) |
Merge pull request #2765 from ctiller/the-test-be-sleepy
Fix edge cases in CHTTP2 resulting in lose of send close from clients
-rw-r--r-- | src/core/client_config/subchannel.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 3 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 34 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 2 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 25 |
6 files changed, 50 insertions, 22 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 17773bd2f4..f428b0fb6a 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -511,8 +511,6 @@ static void publish_transport(grpc_subchannel *c) { connection *destroy_connection = NULL; grpc_channel_element *elem; - gpr_log(GPR_DEBUG, "publish_transport: %p", c->master); - /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; filters = gpr_malloc(sizeof(*filters) * num_filters); diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index f0eeb6de50..cb428f8e3c 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -119,6 +119,10 @@ typedef enum { GRPC_WRITE_STATE_SENT_CLOSE } grpc_chttp2_write_state; +/* flags that can be or'd into stream_global::writing_now */ +#define GRPC_CHTTP2_WRITING_DATA 1 +#define GRPC_CHTTP2_WRITING_WINDOW 2 + typedef enum { GRPC_DONT_SEND_CLOSED = 0, GRPC_SEND_CLOSED, @@ -382,7 +386,7 @@ typedef struct { gpr_uint8 published_cancelled; /** is this stream in the stream map? (boolean) */ gpr_uint8 in_stream_map; - /** is this stream actively being written? */ + /** bitmask of GRPC_CHTTP2_WRITING_xxx above */ gpr_uint8 writing_now; /** stream state already published to the upper layer */ diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 9e68c1e146..9c3ad7a777 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -164,9 +164,6 @@ void grpc_chttp2_list_add_first_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { GPR_ASSERT(stream_global->id != 0); - gpr_log(GPR_DEBUG, "add:%d:%d:%d:%d", stream_global->id, - stream_global->write_state, stream_global->in_stream_map, - stream_global->read_closed); stream_list_add_head(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index d39b0c42f7..b55e81fdca 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -77,7 +77,6 @@ int grpc_chttp2_unlocking_check_writes( stream_writing->id = stream_global->id; stream_writing->send_closed = GRPC_DONT_SEND_CLOSED; - GPR_ASSERT(!stream_global->writing_now); if (stream_global->outgoing_sopb) { window_delta = @@ -123,11 +122,13 @@ int grpc_chttp2_unlocking_check_writes( stream_global->unannounced_incoming_window = 0; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); - stream_global->writing_now = 1; - grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); - } else if (stream_writing->sopb.nops > 0 || - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - stream_global->writing_now = 1; + stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW; + } + if (stream_writing->sopb.nops > 0 || + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA; + } + if (stream_global->writing_now != 0) { grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } } @@ -183,6 +184,7 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, stream_writing->id, &transport_writing->hpack_compressor, &transport_writing->outbuf); + stream_writing->sopb.nops = 0; } if (stream_writing->announce_window > 0) { gpr_slice_buffer_add( @@ -191,7 +193,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { stream_writing->id, stream_writing->announce_window)); stream_writing->announce_window = 0; } - stream_writing->sopb.nops = 0; if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create(stream_writing->id, @@ -215,20 +216,23 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - GPR_ASSERT(stream_global->writing_now); - stream_global->writing_now = 0; - if (stream_global->outgoing_sopb != NULL && - stream_global->outgoing_sopb->nops == 0) { - stream_global->outgoing_sopb = NULL; - grpc_chttp2_schedule_closure(transport_global, - stream_global->send_done_closure, 1); - } + GPR_ASSERT(stream_global->writing_now != 0); if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; if (!transport_global->is_client) { stream_global->read_closed = 1; } } + if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) { + if (stream_global->outgoing_sopb != NULL && + stream_global->outgoing_sopb->nops == 0) { + GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE); + stream_global->outgoing_sopb = NULL; + grpc_chttp2_schedule_closure(transport_global, + stream_global->send_done_closure, 1); + } + } + stream_global->writing_now = 0; grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index f05ec99256..6ba144faa4 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -855,7 +855,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { if (!stream_global->publish_sopb) { continue; } - if (stream_global->writing_now) { + if (stream_global->writing_now != 0) { continue; } /* FIXME(ctiller): we include in_stream_map in our computation of diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index f39c6cf82a..3144ca4dc7 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -830,6 +830,31 @@ TEST_F(End2endTest, HugeResponse) { EXPECT_TRUE(s.ok()); } +namespace { +void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream, gpr_event *ev) { + EchoResponse resp; + gpr_event_set(ev, (void*)1); + while (stream->Read(&resp)) { + gpr_log(GPR_INFO, "Read message"); + } +} +} // namespace + +// Run a Read and a WritesDone simultaneously. +TEST_F(End2endTest, SimultaneousReadWritesDone) { + ResetStub(); + ClientContext context; + gpr_event ev; + gpr_event_init(&ev); + auto stream = stub_->BidiStream(&context); + std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev); + gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); + reader_thread.join(); +} + TEST_F(End2endTest, Peer) { ResetStub(); EchoRequest request; |