diff options
author | Vijay Pai <vpai@google.com> | 2016-02-03 16:23:45 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2016-02-03 16:23:45 -0800 |
commit | 912fe05ed38f55a69c47d6753cdf5073f9250345 (patch) | |
tree | bfeb9331f86a54a324876e37dd3dce9a2b7ab339 /test/cpp | |
parent | feef8be5aeba73fa2d96ec282d1da8b885340f62 (diff) | |
parent | 87afe8c12f99cc73ed344be966b3eb7b00443a6f (diff) |
Merge branch 'master' into async_thread_stress_test
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 410 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 298 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 165 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.h | 17 | ||||
-rw-r--r-- | test/cpp/qps/limit_cores.cc | 7 | ||||
-rw-r--r-- | test/cpp/qps/limit_cores.h | 2 |
6 files changed, 863 insertions, 36 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 0616cc07ee..252bda3798 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -32,6 +32,7 @@ */ #include <memory> +#include <thread> #include <grpc++/channel.h> #include <grpc++/client_context.h> @@ -104,7 +105,10 @@ class Verifier : public PollingCheckRegion { expectations_[tag(i)] = expect_ok; return *this; } - void Verify(CompletionQueue* cq) { + + void Verify(CompletionQueue* cq) { Verify(cq, false); } + + void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { bool ok; @@ -122,7 +126,9 @@ class Verifier : public PollingCheckRegion { } auto it = expectations_.find(got_tag); EXPECT_TRUE(it != expectations_.end()); - EXPECT_EQ(it->second, ok); + if (!ignore_ok) { + EXPECT_EQ(it->second, ok); + } expectations_.erase(it); } } @@ -217,7 +223,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -270,7 +276,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); std::chrono::system_clock::time_point time_now( @@ -315,7 +321,7 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream( + std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), @@ -368,7 +374,7 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream( + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, @@ -418,7 +424,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> > + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), @@ -476,7 +482,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { cli_ctx.AddMetadata(meta1.first, meta1.second); cli_ctx.AddMetadata(meta2.first, meta2.second); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -519,7 +525,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2("key2", "val2"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -568,7 +574,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2("key2", "val2"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -629,7 +635,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { cli_ctx.AddMetadata(meta1.first, meta1.second); cli_ctx.AddMetadata(meta2.first, meta2.second); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -690,7 +696,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); srv_ctx.AsyncNotifyWhenDone(tag(5)); @@ -725,7 +731,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); srv_ctx.AsyncNotifyWhenDone(tag(5)); @@ -759,7 +765,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { ClientContext cli_ctx; send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get())); response_reader->Finish(&recv_response, &recv_status, tag(4)); @@ -769,8 +775,384 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { EXPECT_EQ("", recv_status.error_message()); } +// This class is for testing scenarios where RPCs are cancelled on the server +// by calling ServerContext::TryCancel() +class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { + protected: + typedef enum { + DO_NOT_CANCEL = 0, + CANCEL_BEFORE_PROCESSING, + CANCEL_DURING_PROCESSING, + CANCEL_AFTER_PROCESSING + } ServerTryCancelRequestPhase; + + void ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel()"); + EXPECT_TRUE(context->IsCancelled()); + } + + // Helper for testing client-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading + // any messages from the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading + // messages from the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all + // messages from the client (but before sending any status back to the + // client) + void TestClientStreamingServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + // Initiate the 'RequestStream' call on client + std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( + stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + + // On the server, request to be notified of 'RequestStream' calls + // and receive the 'RequestStream' call just made by the client + service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + + // Client sends 3 messages (tags 3, 4 and 5) + for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { + send_request.set_message("Ping " + std::to_string(tag_idx)); + cli_stream->Write(send_request, tag(tag_idx)); + Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get()); + } + cli_stream->WritesDone(tag(6)); + Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + + bool expected_server_cq_result = true; + bool ignore_cq_result = false; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(&srv_ctx); + + // Since cancellation is done before server reads any results, we know + // for sure that all cq results will return false from this point forward + expected_server_cq_result = false; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + // Server will cancel the RPC in a parallel thread while reading the + // requests from the client. Since the cancellation can happen at anytime, + // some of the cq results (i.e those until cancellation) might be true but + // its non deterministic. So better to ignore the cq results + ignore_cq_result = true; + } + + // Server reads 3 messages (tags 6, 7 and 8) + for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { + srv_stream.Read(&recv_request, tag(tag_idx)); + Verifier(GetParam()) + .Expect(tag_idx, expected_server_cq_result) + .Verify(cq_.get(), ignore_cq_result); + } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(&srv_ctx); + } + + // The RPC has been cancelled at this point for sure (i.e irrespective of + // the value of `server_try_cancel` is). So, from this point forward, we + // know that cq results are supposed to return false on server. + + // Server sends the final message and cancelled status (but the RPC is + // already cancelled at this point. So we expect the operation to fail) + srv_stream.Finish(send_response, Status::CANCELLED, tag(9)); + Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + + // Client will see the cancellation + cli_stream->Finish(&recv_status, tag(10)); + // TODO(sreek): The expectation here should be true. This is a bug (github + // issue #4972) + Verifier(GetParam()).Expect(10, false).Verify(cq_.get()); + EXPECT_FALSE(recv_status.ok()); + EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); + } + + // Helper for testing server-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending + // any messages to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending + // messages to the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all + // messages to the client (but before sending any status back to the + // client) + void TestServerStreamingServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message("Ping"); + // Initiate the 'ResponseStream' call on the client + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + // On the server, request to be notified of 'ResponseStream' calls and + // receive the call just made by the client + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + bool expected_cq_result = true; + bool ignore_cq_result = false; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(&srv_ctx); + + // We know for sure that all cq results will be false from this point + // since the server cancelled the RPC + expected_cq_result = false; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + + // Server will cancel the RPC in a parallel thread while writing responses + // to the client. Since the cancellation can happen at anytime, some of + // the cq results (i.e those until cancellation) might be true but it is + // non deterministic. So better to ignore the cq results + ignore_cq_result = true; + } + + // Server sends three messages (tags 3, 4 and 5) + for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { + send_response.set_message("Pong " + std::to_string(tag_idx)); + srv_stream.Write(send_response, tag(tag_idx)); + Verifier(GetParam()) + .Expect(tag_idx, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); + } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(&srv_ctx); + } + + // Client attemts to read the three messages from the server + for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { + cli_stream->Read(&recv_response, tag(tag_idx)); + Verifier(GetParam()) + .Expect(tag_idx, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); + } + + // The RPC has been cancelled at this point for sure (i.e irrespective of + // the value of `server_try_cancel` is). So, from this point forward, we + // know that cq results are supposed to return false on server. + + // Server finishes the stream (but the RPC is already cancelled) + srv_stream.Finish(Status::CANCELLED, tag(9)); + Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + + // Client will see the cancellation + cli_stream->Finish(&recv_status, tag(10)); + Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + EXPECT_FALSE(recv_status.ok()); + EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); + } + + // Helper for testing bidirectinal-streaming RPCs which are cancelled on the + // server. + // + // Depending on the value of server_try_cancel parameter, this will + // test one of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/ + // writing any messages from/to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading + // messages from the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all + // messages from the client (but before sending any status back to the + // client) + void TestBidiStreamingServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + // Initiate the call from the client side + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + + // On the server, request to be notified of the 'BidiStream' call and + // receive the call just made by the client + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + + // Client sends the first and the only message + send_request.set_message("Ping"); + cli_stream->Write(send_request, tag(3)); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + + bool expected_cq_result = true; + bool ignore_cq_result = false; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(&srv_ctx); + + // We know for sure that all cq results will be false from this point + // since the server cancelled the RPC + expected_cq_result = false; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + + // Since server is going to cancel the RPC in a parallel thread, some of + // the cq results (i.e those until the cancellation) might be true. Since + // that number is non-deterministic, it is better to ignore the cq results + ignore_cq_result = true; + } + + srv_stream.Read(&recv_request, tag(4)); + Verifier(GetParam()) + .Expect(4, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); + + send_response.set_message("Pong"); + srv_stream.Write(send_response, tag(5)); + Verifier(GetParam()) + .Expect(5, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); + + cli_stream->Read(&recv_response, tag(6)); + Verifier(GetParam()) + .Expect(6, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); + + // This is expected to succeed in all cases + cli_stream->WritesDone(tag(7)); + Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); + + // This is expected to fail in all cases i.e for all values of + // server_try_cancel. This is becasue at this point, either there are no + // more msgs from the client (because client called WritesDone) or the RPC + // is cancelled on the server + srv_stream.Read(&recv_request, tag(8)); + Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(&srv_ctx); + } + + // The RPC has been cancelled at this point for sure (i.e irrespective of + // the value of `server_try_cancel` is). So, from this point forward, we + // know that cq results are supposed to return false on server. + + srv_stream.Finish(Status::CANCELLED, tag(9)); + Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(10)); + Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + EXPECT_FALSE(recv_status.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code()); + } +}; + +TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) { + TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) { + TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) { + TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) { + TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) { + TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) { + TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) { + TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) { + TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) { + TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING); +} + INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest, ::testing::Values(false, true)); +INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel, + AsyncEnd2endServerTryCancelTest, + ::testing::Values(false)); } // namespace } // namespace testing diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index cdc123d124..65da71b391 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -306,6 +306,301 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, } } +// This class is for testing scenarios where RPCs are cancelled on the server +// by calling ServerContext::TryCancel() +class End2endServerTryCancelTest : public End2endTest { + protected: + // Helper for testing client-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading + // any messages from the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading + // messages from the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all + // the messages from the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. + void TestRequestStreamServerCancel( + ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + // Send server_try_cancel value in the client metadata + context.AddMetadata(kServerTryCancelRequest, + std::to_string(server_try_cancel)); + + auto stream = stub_->RequestStream(&context, &response); + + int num_msgs_sent = 0; + while (num_msgs_sent < num_msgs_to_send) { + request.set_message("hello"); + if (!stream->Write(request)) { + break; + } + num_msgs_sent++; + } + gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); + + stream->WritesDone(); + Status s = stream->Finish(); + + // At this point, we know for sure that RPC was cancelled by the server + // since we passed server_try_cancel value in the metadata. Depending on the + // value of server_try_cancel, the RPC might have been cancelled by the + // server at different stages. The following validates our expectations of + // number of messages sent in various cancellation scenarios: + + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: + case CANCEL_DURING_PROCESSING: + // If the RPC is cancelled by server before / during messages from the + // client, it means that the client most likely did not get a chance to + // send all the messages it wanted to send. i.e num_msgs_sent <= + // num_msgs_to_send + EXPECT_LE(num_msgs_sent, num_msgs_to_send); + break; + + case CANCEL_AFTER_PROCESSING: + // If the RPC was cancelled after all messages were read by the server, + // the client did get a chance to send all its messages + EXPECT_EQ(num_msgs_sent, num_msgs_to_send); + break; + + default: + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + } + + // Helper for testing server-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing + // any messages to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing + // messages to the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all + // the messages to the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. + void TestResponseStreamServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + // Send server_try_cancel in the client metadata + context.AddMetadata(kServerTryCancelRequest, + std::to_string(server_try_cancel)); + + request.set_message("hello"); + auto stream = stub_->ResponseStream(&context, request); + + int num_msgs_read = 0; + while (num_msgs_read < kNumResponseStreamsMsgs) { + if (!stream->Read(&response)) { + break; + } + EXPECT_EQ(response.message(), + request.message() + std::to_string(num_msgs_read)); + num_msgs_read++; + } + gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); + + Status s = stream->Finish(); + + // Depending on the value of server_try_cancel, the RPC might have been + // cancelled by the server at different stages. The following validates our + // expectations of number of messages read in various cancellation + // scenarios: + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: + // Server cancelled before sending any messages. Which means the client + // wouldn't have read any + EXPECT_EQ(num_msgs_read, 0); + break; + + case CANCEL_DURING_PROCESSING: + // Server cancelled while writing messages. Client must have read less + // than or equal to the expected number of messages + EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); + break; + + case CANCEL_AFTER_PROCESSING: + // Server cancelled after writing all messages. Client must have read + // all messages + EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs); + break; + + default: { + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + } + + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + } + + // Helper for testing bidirectional-streaming RPCs which are cancelled on the + // server. Depending on the value of server_try_cancel parameter, this will + // test one of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/ + // writing any messages from/to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/ + // writing messages from/to the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing + // all the messages from/to the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. + void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, + int num_messages) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + // Send server_try_cancel in the client metadata + context.AddMetadata(kServerTryCancelRequest, + std::to_string(server_try_cancel)); + + auto stream = stub_->BidiStream(&context); + + int num_msgs_read = 0; + int num_msgs_sent = 0; + while (num_msgs_sent < num_messages) { + request.set_message("hello " + std::to_string(num_msgs_sent)); + if (!stream->Write(request)) { + break; + } + num_msgs_sent++; + + if (!stream->Read(&response)) { + break; + } + num_msgs_read++; + + EXPECT_EQ(response.message(), request.message()); + } + gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); + gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); + + stream->WritesDone(); + Status s = stream->Finish(); + + // Depending on the value of server_try_cancel, the RPC might have been + // cancelled by the server at different stages. The following validates our + // expectations of number of messages read in various cancellation + // scenarios: + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: + EXPECT_EQ(num_msgs_read, 0); + break; + + case CANCEL_DURING_PROCESSING: + EXPECT_LE(num_msgs_sent, num_messages); + EXPECT_LE(num_msgs_read, num_msgs_sent); + break; + + case CANCEL_AFTER_PROCESSING: + EXPECT_EQ(num_msgs_sent, num_messages); + EXPECT_EQ(num_msgs_read, num_msgs_sent); + break; + + default: + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + } +}; + +TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.AddMetadata(kServerTryCancelRequest, + std::to_string(CANCEL_BEFORE_PROCESSING)); + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); +} + +// Server to cancel before doing reading the request +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) { + TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1); +} + +// Server to cancel while reading a request from the stream in parallel +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) { + TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10); +} + +// Server to cancel after reading all the requests but before returning to the +// client +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) { + TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4); +} + +// Server to cancel before sending any response messages +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) { + TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING); +} + +// Server to cancel while writing a response to the stream in parallel +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) { + TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING); +} + +// Server to cancel after writing all the respones to the stream but before +// returning to the client +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) { + TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING); +} + +// Server to cancel before reading/writing any requests/responses on the stream +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) { + TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2); +} + +// Server to cancel while reading/writing requests/responses on the stream in +// parallel +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) { + TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10); +} + +// Server to cancel after reading/writing all requests/responses on the stream +// but before returning to the client +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { + TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5); +} + TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { ResetStub(); std::vector<std::thread*> threads; @@ -1059,6 +1354,9 @@ INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(TestScenario(false, false), TestScenario(false, true))); +INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest, + ::testing::Values(TestScenario(false, false))); + INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest, ::testing::Values(TestScenario(false, false), TestScenario(false, true), diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 0eebe64148..66d11d0dfc 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -33,6 +33,8 @@ #include "test/cpp/end2end/test_service_impl.h" +#include <thread> + #include <grpc++/security/credentials.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> @@ -82,6 +84,17 @@ void CheckServerAuthContext(const ServerContext* context, Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) { + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + if (server_try_cancel > DO_NOT_CANCEL) { + // Since this is a unary RPC, by the time this server handler is called, + // the 'request' message is already read from the client. So the scenarios + // in server_try_cancel don't make much sense. Just cancel the RPC as long + // as server_try_cancel is not DO_NOT_CANCEL + ServerTryCancel(context); + return Status::CANCELLED; + } + response->set_message(request->message()); MaybeEchoDeadline(context, request, response); if (host_) { @@ -143,18 +156,39 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, Status TestServiceImpl::RequestStream(ServerContext* context, ServerReader<EchoRequest>* reader, EchoResponse* response) { + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads + // any message from the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + // If 'cancel_after_reads' is set in the metadata AND non-zero, the server + // will cancel the RPC (by just returning Status::CANCELLED - doesn't call + // ServerContext::TryCancel()) after reading the number of records specified + // by the 'cancel_after_reads' value set in the metadata. + int cancel_after_reads = GetIntValueFromMetadata( + kServerCancelAfterReads, context->client_metadata(), 0); + EchoRequest request; response->set_message(""); - int cancel_after_reads = 0; - const std::multimap<grpc::string_ref, grpc::string_ref>& - client_initial_metadata = context->client_metadata(); - if (client_initial_metadata.find(kServerCancelAfterReads) != - client_initial_metadata.end()) { - std::istringstream iss(ToString( - client_initial_metadata.find(kServerCancelAfterReads)->second)); - iss >> cancel_after_reads; - gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } + + int num_msgs_read = 0; while (reader->Read(&request)) { if (cancel_after_reads == 1) { gpr_log(GPR_INFO, "return cancel status"); @@ -164,21 +198,65 @@ Status TestServiceImpl::RequestStream(ServerContext* context, } response->mutable_message()->append(request.message()); } + gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + return Status::OK; } -// Return 3 messages. +// Return 'kNumResponseStreamMsgs' messages. // TODO(yangg) make it generic by adding a parameter into EchoRequest Status TestServiceImpl::ResponseStream(ServerContext* context, const EchoRequest* request, ServerWriter<EchoResponse>* writer) { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes + // any messages to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // writing messages to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes + // all the messages to the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + EchoResponse response; - response.set_message(request->message() + "0"); - writer->Write(response); - response.set_message(request->message() + "1"); - writer->Write(response); - response.set_message(request->message() + "2"); - writer->Write(response); + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + } + + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + response.set_message(request->message() + std::to_string(i)); + writer->Write(response); + } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } return Status::OK; } @@ -186,15 +264,70 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, Status TestServiceImpl::BidiStream( ServerContext* context, ServerReaderWriter<EchoResponse, EchoRequest>* stream) { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ + // writes any messages from/to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading/writing messages from/to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server + // reads/writes all messages from/to the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + EchoRequest request; EchoResponse response; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + } + while (stream->Read(&request)) { gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); response.set_message(request.message()); stream->Write(response); } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + return Status::OK; } +int TestServiceImpl::GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + if (metadata.find(key) != metadata.end()) { + std::istringstream iss(ToString(metadata.find(key)->second)); + iss >> default_value; + gpr_log(GPR_INFO, "%s : %d", key, default_value); + } + + return default_value; +} + +void TestServiceImpl::ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + EXPECT_TRUE(context->IsCancelled()); +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 2c35b5614c..1ab6ced9e0 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -44,7 +44,16 @@ namespace grpc { namespace testing { +const int kNumResponseStreamsMsgs = 3; const char* const kServerCancelAfterReads = "cancel_after_reads"; +const char* const kServerTryCancelRequest = "server_try_cancel"; + +typedef enum { + DO_NOT_CANCEL = 0, + CANCEL_BEFORE_PROCESSING, + CANCEL_DURING_PROCESSING, + CANCEL_AFTER_PROCESSING +} ServerTryCancelRequestPhase; class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: @@ -74,6 +83,14 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { } private: + int GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value); + + void ServerTryCancel(ServerContext* context); + + private: bool signal_client_; std::mutex mu_; std::unique_ptr<grpc::string> host_; diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc index c2f3ad8fde..fad9a323af 100644 --- a/test/cpp/qps/limit_cores.cc +++ b/test/cpp/qps/limit_cores.cc @@ -36,7 +36,6 @@ #include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> -#include <vector> namespace grpc { namespace testing { @@ -46,11 +45,11 @@ namespace testing { #define _GNU_SOURCE #endif #include <sched.h> -int LimitCores(const int *cores, int cores_size) { +int LimitCores(const int* cores, int cores_size) { const int num_cores = gpr_cpu_num_cores(); int cores_set = 0; - cpu_set_t *cpup = CPU_ALLOC(num_cores); + cpu_set_t* cpup = CPU_ALLOC(num_cores); GPR_ASSERT(cpup); const size_t size = CPU_ALLOC_SIZE(num_cores); CPU_ZERO_S(size, cpup); @@ -74,7 +73,7 @@ int LimitCores(const int *cores, int cores_size) { } #else // LimitCores is not currently supported for non-Linux platforms -int LimitCores(std::vector<int> core_vec) { return gpr_cpu_num_cores(); } +int LimitCores(const int*, int) { return gpr_cpu_num_cores(); } #endif } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/limit_cores.h b/test/cpp/qps/limit_cores.h index 5c0d1e315d..5482904a3c 100644 --- a/test/cpp/qps/limit_cores.h +++ b/test/cpp/qps/limit_cores.h @@ -34,8 +34,6 @@ #ifndef TEST_QPS_LIMIT_CORES_H #define TEST_QPS_LIMIT_CORES_H -#include <vector> - namespace grpc { namespace testing { /// LimitCores: allow this worker to only run on the cores specified in the |