aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-03-19 23:28:29 -0700
committerGravatar Vijay Pai <vpai@google.com>2018-03-20 08:46:28 -0700
commitcdddc8ce421f120ca038c83f2a3628544beae21f (patch)
treea81d7f1d307f791971a4504983deeebb01a46ce6
parent9bef1390540e7662b6d941c0a17f136b10ffc084 (diff)
Desneak client unary call, avoid Hyrum's Law (used for 1-thread simplicity)
-rw-r--r--include/grpcpp/impl/codegen/async_unary_call.h51
-rw-r--r--test/cpp/end2end/async_end2end_test.cc33
-rw-r--r--test/cpp/end2end/nonblocking_test.cc2
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_trickle.cc2
-rw-r--r--test/cpp/microbenchmarks/fullstack_unary_ping_pong.h2
-rw-r--r--test/cpp/performance/writes_per_rpc_test.cc2
6 files changed, 46 insertions, 46 deletions
diff --git a/include/grpcpp/impl/codegen/async_unary_call.h b/include/grpcpp/impl/codegen/async_unary_call.h
index 255f874e26..60ff8e2f05 100644
--- a/include/grpcpp/impl/codegen/async_unary_call.h
+++ b/include/grpcpp/impl/codegen/async_unary_call.h
@@ -126,9 +126,10 @@ class ClientAsyncResponseReader final
assert(started_);
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- meta_buf.set_output_tag(tag);
- meta_buf.RecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf);
+ single_buf.set_output_tag(tag);
+ single_buf.RecvInitialMetadata(context_);
+ call_.PerformOps(&single_buf);
+ initial_metadata_read_ = true;
}
/// See \a ClientAysncResponseReaderInterface::Finish for semantics.
@@ -138,14 +139,20 @@ class ClientAsyncResponseReader final
/// possible initial and trailing metadata sent from the server.
void Finish(R* msg, Status* status, void* tag) override {
assert(started_);
- finish_buf.set_output_tag(tag);
- if (!context_->initial_metadata_received_) {
- finish_buf.RecvInitialMetadata(context_);
+ if (initial_metadata_read_) {
+ finish_buf.set_output_tag(tag);
+ finish_buf.RecvMessage(msg);
+ finish_buf.AllowNoMessage();
+ finish_buf.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_buf);
+ } else {
+ single_buf.set_output_tag(tag);
+ single_buf.RecvInitialMetadata(context_);
+ single_buf.RecvMessage(msg);
+ single_buf.AllowNoMessage();
+ single_buf.ClientRecvStatus(context_, status);
+ call_.PerformOps(&single_buf);
}
- finish_buf.RecvMessage(msg);
- finish_buf.AllowNoMessage();
- finish_buf.ClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf);
}
private:
@@ -153,6 +160,7 @@ class ClientAsyncResponseReader final
ClientContext* const context_;
::grpc::internal::Call call_;
bool started_;
+ bool initial_metadata_read_ = false;
template <class W>
ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context,
@@ -160,30 +168,29 @@ class ClientAsyncResponseReader final
: context_(context), call_(call), started_(start) {
// Bind the metadata at time of StartCallInternal but set up the rest here
// TODO(ctiller): don't assert
- GPR_CODEGEN_ASSERT(init_buf.SendMessage(request).ok());
- init_buf.ClientSendClose();
+ GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok());
+ single_buf.ClientSendClose();
if (start) StartCallInternal();
}
void StartCallInternal() {
- init_buf.SendInitialMetadata(context_->send_initial_metadata_,
- context_->initial_metadata_flags());
- call_.PerformOps(&init_buf);
+ single_buf.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
}
// disable operator new
static void* operator new(std::size_t size);
static void* operator new(std::size_t size, void* p) { return p; }
- ::grpc::internal::SneakyCallOpSet<::grpc::internal::CallOpSendInitialMetadata,
- ::grpc::internal::CallOpSendMessage,
- ::grpc::internal::CallOpClientSendClose>
- init_buf;
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
- meta_buf;
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose,
+ ::grpc::internal::CallOpRecvInitialMetadata,
::grpc::internal::CallOpRecvMessage<R>,
::grpc::internal::CallOpClientRecvStatus>
+ single_buf;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>,
+ ::grpc::internal::CallOpClientRecvStatus>
finish_buf;
};
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index dd777d10c2..d22793e23c 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -319,12 +319,13 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(), cq_.get(), tag(2));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+
Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
@@ -434,13 +435,13 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(3, true).Expect(4, true).Verify(
cq_.get(), std::chrono::system_clock::time_point::max());
@@ -475,21 +476,18 @@ TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
auto resp_writer_ptr = &response_writer;
auto lambda_2 = [&, this, resp_writer_ptr]() {
- gpr_log(GPR_ERROR, "CALLED");
service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
cq_.get(), tag(2));
};
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
EXPECT_EQ(send_request.message(), recv_request.message());
- auto recv_resp_ptr = &recv_response;
- auto status_ptr = &recv_status;
send_response.set_message(recv_request.message());
auto lambda_3 = [&, this, resp_writer_ptr, send_response]() {
resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
};
- response_reader->Finish(recv_resp_ptr, status_ptr, tag(4));
Verifier().Expect(3, true).Expect(4, true).Verify(
cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
@@ -887,6 +885,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
@@ -903,7 +902,6 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
@@ -929,6 +927,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+ response_reader->ReadInitialMetadata(tag(4));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
@@ -937,10 +936,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
-
- response_reader->ReadInitialMetadata(tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second,
ToString(server_initial_metadata.find(meta1.first)->second));
@@ -976,6 +972,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+ response_reader->Finish(&recv_response, &recv_status, tag(5));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
@@ -988,7 +985,6 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
- response_reader->Finish(&recv_response, &recv_status, tag(5));
Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
@@ -1036,6 +1032,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+ response_reader->ReadInitialMetadata(tag(4));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
@@ -1051,9 +1048,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
- response_reader->ReadInitialMetadata(tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second,
ToString(server_initial_metadata.find(meta3.first)->second));
@@ -1096,6 +1091,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
srv_ctx.AsyncNotifyWhenDone(tag(5));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@@ -1105,12 +1101,9 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
EXPECT_EQ(send_request.message(), recv_request.message());
cli_ctx.TryCancel();
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
- response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
-
EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
}
@@ -1131,6 +1124,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
srv_ctx.AsyncNotifyWhenDone(tag(5));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@@ -1141,7 +1135,6 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled());
diff --git a/test/cpp/end2end/nonblocking_test.cc b/test/cpp/end2end/nonblocking_test.cc
index cb75848337..d8337baca2 100644
--- a/test/cpp/end2end/nonblocking_test.cc
+++ b/test/cpp/end2end/nonblocking_test.cc
@@ -128,6 +128,7 @@ class NonblockingTest : public ::testing::Test {
stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get()));
response_reader->StartCall();
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(), cq_.get(), tag(2));
@@ -141,7 +142,6 @@ class NonblockingTest : public ::testing::Test {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
int tagsum = 0;
int tagprod = 1;
diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
index 294f1feb80..3b21c4c278 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
@@ -394,13 +394,13 @@ static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) {
stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
void* t;
bool ok;
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
TrickleCQNext(fixture.get(), &t, &ok, in_warmup ? -1 : state.iterations());
GPR_ASSERT(ok);
GPR_ASSERT(t == tag(0) || t == tag(1));
intptr_t slot = reinterpret_cast<intptr_t>(t);
ServerEnv* senv = server_env[slot];
senv->response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
for (int i = (1 << 3) | (1 << 4); i != 0;) {
TrickleCQNext(fixture.get(), &t, &ok,
in_warmup ? -1 : state.iterations());
diff --git a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
index a85c33c320..843c8e1486 100644
--- a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
+++ b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
@@ -78,6 +78,7 @@ static void BM_UnaryPingPong(benchmark::State& state) {
ClientContextMutator cli_ctx_mut(&cli_ctx);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
void* t;
bool ok;
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
@@ -87,7 +88,6 @@ static void BM_UnaryPingPong(benchmark::State& state) {
ServerEnv* senv = server_env[slot];
ServerContextMutator svr_ctx_mut(&senv->ctx);
senv->response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
for (int i = (1 << 3) | (1 << 4); i != 0;) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
GPR_ASSERT(ok);
diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc
index 5faa7ba757..0ea3181f7e 100644
--- a/test/cpp/performance/writes_per_rpc_test.cc
+++ b/test/cpp/performance/writes_per_rpc_test.cc
@@ -207,13 +207,13 @@ static double UnaryPingPong(int request_size, int response_size) {
stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
void* t;
bool ok;
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
GPR_ASSERT(ok);
GPR_ASSERT(t == tag(0) || t == tag(1));
intptr_t slot = reinterpret_cast<intptr_t>(t);
ServerEnv* senv = server_env[slot];
senv->response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
for (int i = (1 << 3) | (1 << 4); i != 0;) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
GPR_ASSERT(ok);