diff options
Diffstat (limited to 'test/cpp/end2end/end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 184 |
1 files changed, 167 insertions, 17 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 41c2669533..5e89490ecb 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -31,13 +31,12 @@ * */ -#include <chrono> #include <thread> +#include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include "test/cpp/util/echo_duplicate.pb.h" -#include "test/cpp/util/echo.pb.h" -#include "src/cpp/util/time.h" +#include "test/cpp/util/echo_duplicate.grpc.pb.h" +#include "test/cpp/util/echo.grpc.pb.h" #include "src/cpp/server/thread_pool.h" #include <grpc++/channel_arguments.h> #include <grpc++/channel_interface.h> @@ -50,7 +49,7 @@ #include <grpc++/server_credentials.h> #include <grpc++/status.h> #include <grpc++/stream.h> -#include "test/core/util/port.h" +#include <grpc++/time.h> #include <gtest/gtest.h> #include <grpc/grpc.h> @@ -72,8 +71,8 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, EchoResponse* response) { if (request->has_param() && request->param().echo_deadline()) { gpr_timespec deadline = gpr_inf_future; - if (context->absolute_deadline() != system_clock::time_point::max()) { - Timepoint2Timespec(context->absolute_deadline(), &deadline); + if (context->deadline() != system_clock::time_point::max()) { + Timepoint2Timespec(context->deadline(), &deadline); } response->mutable_param()->set_request_deadline(deadline.tv_sec); } @@ -83,10 +82,30 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { public: + TestServiceImpl() : signal_client_(false) {} + Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) GRPC_OVERRIDE { response->set_message(request->message()); MaybeEchoDeadline(context, request, response); + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock<std::mutex> lock(mu_); + signal_client_ = true; + } + while (!context->IsCancelled()) { + std::this_thread::sleep_for(std::chrono::microseconds( + request->param().client_cancel_after_us())); + } + return Status::Cancelled; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + std::this_thread::sleep_for( + std::chrono::microseconds(request->param().server_cancel_after_us())); + return Status::Cancelled; + } else { + EXPECT_FALSE(context->IsCancelled()); + } return Status::OK; } @@ -130,6 +149,15 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { } return Status::OK; } + + bool signal_client() { + std::unique_lock<std::mutex> lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; }; class TestServiceImplDupPkg @@ -151,7 +179,8 @@ class End2endTest : public ::testing::Test { server_address_ << "localhost:" << port; // Setup server ServerBuilder builder; - builder.AddListeningPort(server_address_.str(), InsecureServerCredentials()); + builder.AddListeningPort(server_address_.str(), + InsecureServerCredentials()); builder.RegisterService(&service_); builder.RegisterService(&dup_pkg_service_); builder.SetThreadPool(&thread_pool_); @@ -215,7 +244,7 @@ TEST_F(End2endTest, RpcDeadlineExpires) { ClientContext context; std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::microseconds(10); - context.set_absolute_deadline(deadline); + context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.code()); } @@ -230,7 +259,7 @@ TEST_F(End2endTest, RpcLongDeadline) { ClientContext context; std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::hours(1); - context.set_absolute_deadline(deadline); + context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.IsOk()); @@ -247,7 +276,7 @@ TEST_F(End2endTest, EchoDeadline) { ClientContext context; std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(100); - context.set_absolute_deadline(deadline); + context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.IsOk()); @@ -398,7 +427,7 @@ TEST_F(End2endTest, DiffPackageServices) { // rpc and stream should fail on bad credentials. TEST_F(End2endTest, BadCredentials) { std::unique_ptr<Credentials> bad_creds = - ServiceAccountCredentials("", "", std::chrono::seconds(1)); + ServiceAccountCredentials("", "", 1); EXPECT_EQ(nullptr, bad_creds.get()); std::shared_ptr<ChannelInterface> channel = CreateChannel(server_address_.str(), bad_creds, ChannelArguments()); @@ -423,15 +452,136 @@ TEST_F(End2endTest, BadCredentials) { EXPECT_EQ("Rpc sent on a lame channel.", s.details()); } +void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) { + std::this_thread::sleep_for(std::chrono::microseconds(delay_us)); + while (!service->signal_client()) { + } + context->TryCancel(); +} + +// Client cancels rpc after 10ms +TEST_F(End2endTest, ClientCancelsRpc) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + const int kCancelDelayUs = 10 * 1000; + request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs); + + ClientContext context; + std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_); + Status s = stub_->Echo(&context, request, &response); + cancel_thread.join(); + EXPECT_EQ(StatusCode::CANCELLED, s.code()); + EXPECT_EQ(s.details(), "Cancelled"); +} + +// Server cancels rpc after 1ms +TEST_F(End2endTest, ServerCancelsRpc) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_server_cancel_after_us(1000); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(StatusCode::CANCELLED, s.code()); + EXPECT_TRUE(s.details().empty()); +} + +// Client cancels request stream after sending two messages +TEST_F(End2endTest, ClientCancelsRequestStream) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + + auto stream = stub_->RequestStream(&context, &response); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Write(request)); + + context.TryCancel(); + + Status s = stream->Finish(); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code()); + + EXPECT_EQ(response.message(), ""); + +} + +// Client cancels server stream after sending some messages +TEST_F(End2endTest, ClientCancelsResponseStream) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + + auto stream = stub_->ResponseStream(&context, request); + + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1"); + + context.TryCancel(); + + // The cancellation races with responses, so there might be zero or + // one responses pending, read till failure + + if (stream->Read(&response)) { + EXPECT_EQ(response.message(), request.message() + "2"); + // Since we have cancelled, we expect the next attempt to read to fail + EXPECT_FALSE(stream->Read(&response)); + } + + Status s = stream->Finish(); + // The final status could be either of CANCELLED or OK depending on + // who won the race. + EXPECT_GE(grpc::StatusCode::CANCELLED, s.code()); +} + +// Client cancels bidi stream after sending some messages +TEST_F(End2endTest, ClientCancelsBidi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + grpc::string msg("hello"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "1"); + EXPECT_TRUE(stream->Write(request)); + + context.TryCancel(); + + // The cancellation races with responses, so there might be zero or + // one responses pending, read till failure + + if (stream->Read(&response)) { + EXPECT_EQ(response.message(), request.message()); + // Since we have cancelled, we expect the next attempt to read to fail + EXPECT_FALSE(stream->Read(&response)); + } + + Status s = stream->Finish(); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code()); +} + + } // namespace testing } // namespace grpc int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); ::testing::InitGoogleTest(&argc, argv); - int result = RUN_ALL_TESTS(); - grpc_shutdown(); - google::protobuf::ShutdownProtobufLibrary(); - return result; + return RUN_ALL_TESTS(); } |