diff options
-rw-r--r-- | include/grpc++/impl/codegen/method_handler_impl.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 20 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 25 |
3 files changed, 41 insertions, 7 deletions
diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 5afa4e9f7e..6ec2836037 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -188,6 +188,9 @@ class TemplatedBidiStreamingHandler : public MethodHandler { } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); + if (param.server_context->has_hanging_ops_) { + param.call->cq()->Pluck(¶m.server_context->hanging_ops_); + } param.call->cq()->Pluck(&ops); } diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index b7d81b3382..677fa78d9e 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -661,20 +661,26 @@ class ServerReaderWriterBody final { if (options.is_last_message()) { options.set_buffer_hint(); } - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { + if (!ctx_->hanging_ops_.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); + ctx_->hanging_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - ops.set_compression_level(ctx_->compression_level()); + ctx_->hanging_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); + call_->PerformOps(&ctx_->hanging_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_hanging_ops_ = true; + return true; + } + return call_->cq()->Pluck(&ctx_->hanging_ops_); } private: diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index abaef9da2e..d72dda3f59 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -852,6 +852,31 @@ TEST_P(End2endTest, BidiStreamWithCoalescingApi) { EXPECT_TRUE(s.ok()); } +// This was added to prevent regression from issue: +// https://github.com/grpc/grpc/issues/11546 +TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.AddMetadata(kServerFinishAfterNReads, "1"); + context.set_initial_metadata_corked(true); + grpc::string msg("hello"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + stream->WriteLast(request, WriteOptions()); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_P(End2endTest, DiffPackageServices) { |