diff options
author | Vijay Pai <vpai@google.com> | 2018-03-19 23:28:29 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-03-20 08:46:28 -0700 |
commit | cdddc8ce421f120ca038c83f2a3628544beae21f (patch) | |
tree | a81d7f1d307f791971a4504983deeebb01a46ce6 | |
parent | 9bef1390540e7662b6d941c0a17f136b10ffc084 (diff) |
Desneak client unary call, avoid Hyrum's Law (used for 1-thread simplicity)
-rw-r--r-- | include/grpcpp/impl/codegen/async_unary_call.h | 51 | ||||
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 33 | ||||
-rw-r--r-- | test/cpp/end2end/nonblocking_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 2 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/fullstack_unary_ping_pong.h | 2 | ||||
-rw-r--r-- | test/cpp/performance/writes_per_rpc_test.cc | 2 |
6 files changed, 46 insertions, 46 deletions
diff --git a/include/grpcpp/impl/codegen/async_unary_call.h b/include/grpcpp/impl/codegen/async_unary_call.h index 255f874e26..60ff8e2f05 100644 --- a/include/grpcpp/impl/codegen/async_unary_call.h +++ b/include/grpcpp/impl/codegen/async_unary_call.h @@ -126,9 +126,10 @@ class ClientAsyncResponseReader final assert(started_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - meta_buf.set_output_tag(tag); - meta_buf.RecvInitialMetadata(context_); - call_.PerformOps(&meta_buf); + single_buf.set_output_tag(tag); + single_buf.RecvInitialMetadata(context_); + call_.PerformOps(&single_buf); + initial_metadata_read_ = true; } /// See \a ClientAysncResponseReaderInterface::Finish for semantics. @@ -138,14 +139,20 @@ class ClientAsyncResponseReader final /// possible initial and trailing metadata sent from the server. void Finish(R* msg, Status* status, void* tag) override { assert(started_); - finish_buf.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_buf.RecvInitialMetadata(context_); + if (initial_metadata_read_) { + finish_buf.set_output_tag(tag); + finish_buf.RecvMessage(msg); + finish_buf.AllowNoMessage(); + finish_buf.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_buf); + } else { + single_buf.set_output_tag(tag); + single_buf.RecvInitialMetadata(context_); + single_buf.RecvMessage(msg); + single_buf.AllowNoMessage(); + single_buf.ClientRecvStatus(context_, status); + call_.PerformOps(&single_buf); } - finish_buf.RecvMessage(msg); - finish_buf.AllowNoMessage(); - finish_buf.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf); } private: @@ -153,6 +160,7 @@ class ClientAsyncResponseReader final ClientContext* const context_; ::grpc::internal::Call call_; bool started_; + bool initial_metadata_read_ = false; template <class W> ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context, @@ -160,30 +168,29 @@ class ClientAsyncResponseReader final : context_(context), call_(call), started_(start) { // Bind the metadata at time of StartCallInternal but set up the rest here // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(init_buf.SendMessage(request).ok()); - init_buf.ClientSendClose(); + GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok()); + single_buf.ClientSendClose(); if (start) StartCallInternal(); } void StartCallInternal() { - init_buf.SendInitialMetadata(context_->send_initial_metadata_, - context_->initial_metadata_flags()); - call_.PerformOps(&init_buf); + single_buf.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); } // disable operator new static void* operator new(std::size_t size); static void* operator new(std::size_t size, void* p) { return p; } - ::grpc::internal::SneakyCallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> - init_buf; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - meta_buf; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose, + ::grpc::internal::CallOpRecvInitialMetadata, ::grpc::internal::CallOpRecvMessage<R>, ::grpc::internal::CallOpClientRecvStatus> + single_buf; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>, + ::grpc::internal::CallOpClientRecvStatus> finish_buf; }; diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index dd777d10c2..d22793e23c 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -319,12 +319,13 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); @@ -434,13 +435,13 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier().Expect(2, true).Verify(cq_.get(), time_limit); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier().Expect(3, true).Expect(4, true).Verify( cq_.get(), std::chrono::system_clock::time_point::max()); @@ -475,21 +476,18 @@ TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) { auto resp_writer_ptr = &response_writer; auto lambda_2 = [&, this, resp_writer_ptr]() { - gpr_log(GPR_ERROR, "CALLED"); service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(), cq_.get(), tag(2)); }; + response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2); EXPECT_EQ(send_request.message(), recv_request.message()); - auto recv_resp_ptr = &recv_response; - auto status_ptr = &recv_status; send_response.set_message(recv_request.message()); auto lambda_3 = [&, this, resp_writer_ptr, send_response]() { resp_writer_ptr->Finish(send_response, Status::OK, tag(3)); }; - response_reader->Finish(recv_resp_ptr, status_ptr, tag(4)); Verifier().Expect(3, true).Expect(4, true).Verify( cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3); @@ -887,6 +885,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + response_reader->Finish(&recv_response, &recv_status, tag(4)); service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); @@ -903,7 +902,6 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); @@ -929,6 +927,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + response_reader->ReadInitialMetadata(tag(4)); service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); @@ -937,10 +936,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second); response_writer.SendInitialMetadata(tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); - - response_reader->ReadInitialMetadata(tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta1.second, ToString(server_initial_metadata.find(meta1.first)->second)); @@ -976,6 +972,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + response_reader->Finish(&recv_response, &recv_status, tag(5)); service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); @@ -988,7 +985,6 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); response_writer.Finish(send_response, Status::OK, tag(4)); - response_reader->Finish(&recv_response, &recv_status, tag(5)); Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get()); @@ -1036,6 +1032,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + response_reader->ReadInitialMetadata(tag(4)); service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); @@ -1051,9 +1048,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second); response_writer.SendInitialMetadata(tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); - response_reader->ReadInitialMetadata(tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta3.second, ToString(server_initial_metadata.find(meta3.first)->second)); @@ -1096,6 +1091,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + response_reader->Finish(&recv_response, &recv_status, tag(4)); srv_ctx.AsyncNotifyWhenDone(tag(5)); service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -1105,12 +1101,9 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { EXPECT_EQ(send_request.message(), recv_request.message()); cli_ctx.TryCancel(); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); - EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code()); } @@ -1131,6 +1124,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + response_reader->Finish(&recv_response, &recv_status, tag(4)); srv_ctx.AsyncNotifyWhenDone(tag(5)); service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -1141,7 +1135,6 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get()); EXPECT_FALSE(srv_ctx.IsCancelled()); diff --git a/test/cpp/end2end/nonblocking_test.cc b/test/cpp/end2end/nonblocking_test.cc index cb75848337..d8337baca2 100644 --- a/test/cpp/end2end/nonblocking_test.cc +++ b/test/cpp/end2end/nonblocking_test.cc @@ -128,6 +128,7 @@ class NonblockingTest : public ::testing::Test { stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get())); response_reader->StartCall(); + response_reader->Finish(&recv_response, &recv_status, tag(4)); service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); @@ -141,7 +142,6 @@ class NonblockingTest : public ::testing::Test { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); int tagsum = 0; int tagprod = 1; diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 294f1feb80..3b21c4c278 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -394,13 +394,13 @@ static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) { stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); void* t; bool ok; + response_reader->Finish(&recv_response, &recv_status, tag(4)); TrickleCQNext(fixture.get(), &t, &ok, in_warmup ? -1 : state.iterations()); GPR_ASSERT(ok); GPR_ASSERT(t == tag(0) || t == tag(1)); intptr_t slot = reinterpret_cast<intptr_t>(t); ServerEnv* senv = server_env[slot]; senv->response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); for (int i = (1 << 3) | (1 << 4); i != 0;) { TrickleCQNext(fixture.get(), &t, &ok, in_warmup ? -1 : state.iterations()); diff --git a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h index a85c33c320..843c8e1486 100644 --- a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h +++ b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h @@ -78,6 +78,7 @@ static void BM_UnaryPingPong(benchmark::State& state) { ClientContextMutator cli_ctx_mut(&cli_ctx); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); + response_reader->Finish(&recv_response, &recv_status, tag(4)); void* t; bool ok; GPR_ASSERT(fixture->cq()->Next(&t, &ok)); @@ -87,7 +88,6 @@ static void BM_UnaryPingPong(benchmark::State& state) { ServerEnv* senv = server_env[slot]; ServerContextMutator svr_ctx_mut(&senv->ctx); senv->response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); for (int i = (1 << 3) | (1 << 4); i != 0;) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); GPR_ASSERT(ok); diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index 5faa7ba757..0ea3181f7e 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -207,13 +207,13 @@ static double UnaryPingPong(int request_size, int response_size) { stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); void* t; bool ok; + response_reader->Finish(&recv_response, &recv_status, tag(4)); GPR_ASSERT(fixture->cq()->Next(&t, &ok)); GPR_ASSERT(ok); GPR_ASSERT(t == tag(0) || t == tag(1)); intptr_t slot = reinterpret_cast<intptr_t>(t); ServerEnv* senv = server_env[slot]; senv->response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); for (int i = (1 << 3) | (1 << 4); i != 0;) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); GPR_ASSERT(ok); |