aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/end2end_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/end2end_test.cc')
-rw-r--r--test/cpp/end2end/end2end_test.cc184
1 files changed, 167 insertions, 17 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 41c2669533..5e89490ecb 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -31,13 +31,12 @@
*
*/
-#include <chrono>
#include <thread>
+#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
-#include "test/cpp/util/echo_duplicate.pb.h"
-#include "test/cpp/util/echo.pb.h"
-#include "src/cpp/util/time.h"
+#include "test/cpp/util/echo_duplicate.grpc.pb.h"
+#include "test/cpp/util/echo.grpc.pb.h"
#include "src/cpp/server/thread_pool.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
@@ -50,7 +49,7 @@
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
#include <grpc++/stream.h>
-#include "test/core/util/port.h"
+#include <grpc++/time.h>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
@@ -72,8 +71,8 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
if (request->has_param() && request->param().echo_deadline()) {
gpr_timespec deadline = gpr_inf_future;
- if (context->absolute_deadline() != system_clock::time_point::max()) {
- Timepoint2Timespec(context->absolute_deadline(), &deadline);
+ if (context->deadline() != system_clock::time_point::max()) {
+ Timepoint2Timespec(context->deadline(), &deadline);
}
response->mutable_param()->set_request_deadline(deadline.tv_sec);
}
@@ -83,10 +82,30 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
public:
+ TestServiceImpl() : signal_client_(false) {}
+
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) GRPC_OVERRIDE {
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
+ if (request->has_param() && request->param().client_cancel_after_us()) {
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ signal_client_ = true;
+ }
+ while (!context->IsCancelled()) {
+ std::this_thread::sleep_for(std::chrono::microseconds(
+ request->param().client_cancel_after_us()));
+ }
+ return Status::Cancelled;
+ } else if (request->has_param() &&
+ request->param().server_cancel_after_us()) {
+ std::this_thread::sleep_for(
+ std::chrono::microseconds(request->param().server_cancel_after_us()));
+ return Status::Cancelled;
+ } else {
+ EXPECT_FALSE(context->IsCancelled());
+ }
return Status::OK;
}
@@ -130,6 +149,15 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
}
return Status::OK;
}
+
+ bool signal_client() {
+ std::unique_lock<std::mutex> lock(mu_);
+ return signal_client_;
+ }
+
+ private:
+ bool signal_client_;
+ std::mutex mu_;
};
class TestServiceImplDupPkg
@@ -151,7 +179,8 @@ class End2endTest : public ::testing::Test {
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
- builder.AddListeningPort(server_address_.str(), InsecureServerCredentials());
+ builder.AddListeningPort(server_address_.str(),
+ InsecureServerCredentials());
builder.RegisterService(&service_);
builder.RegisterService(&dup_pkg_service_);
builder.SetThreadPool(&thread_pool_);
@@ -215,7 +244,7 @@ TEST_F(End2endTest, RpcDeadlineExpires) {
ClientContext context;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::microseconds(10);
- context.set_absolute_deadline(deadline);
+ context.set_deadline(deadline);
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.code());
}
@@ -230,7 +259,7 @@ TEST_F(End2endTest, RpcLongDeadline) {
ClientContext context;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::hours(1);
- context.set_absolute_deadline(deadline);
+ context.set_deadline(deadline);
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
@@ -247,7 +276,7 @@ TEST_F(End2endTest, EchoDeadline) {
ClientContext context;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(100);
- context.set_absolute_deadline(deadline);
+ context.set_deadline(deadline);
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
@@ -398,7 +427,7 @@ TEST_F(End2endTest, DiffPackageServices) {
// rpc and stream should fail on bad credentials.
TEST_F(End2endTest, BadCredentials) {
std::unique_ptr<Credentials> bad_creds =
- ServiceAccountCredentials("", "", std::chrono::seconds(1));
+ ServiceAccountCredentials("", "", 1);
EXPECT_EQ(nullptr, bad_creds.get());
std::shared_ptr<ChannelInterface> channel =
CreateChannel(server_address_.str(), bad_creds, ChannelArguments());
@@ -423,15 +452,136 @@ TEST_F(End2endTest, BadCredentials) {
EXPECT_EQ("Rpc sent on a lame channel.", s.details());
}
+void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
+ std::this_thread::sleep_for(std::chrono::microseconds(delay_us));
+ while (!service->signal_client()) {
+ }
+ context->TryCancel();
+}
+
+// Client cancels rpc after 10ms
+TEST_F(End2endTest, ClientCancelsRpc) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Hello");
+ const int kCancelDelayUs = 10 * 1000;
+ request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
+
+ ClientContext context;
+ std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_);
+ Status s = stub_->Echo(&context, request, &response);
+ cancel_thread.join();
+ EXPECT_EQ(StatusCode::CANCELLED, s.code());
+ EXPECT_EQ(s.details(), "Cancelled");
+}
+
+// Server cancels rpc after 1ms
+TEST_F(End2endTest, ServerCancelsRpc) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Hello");
+ request.mutable_param()->set_server_cancel_after_us(1000);
+
+ ClientContext context;
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_EQ(StatusCode::CANCELLED, s.code());
+ EXPECT_TRUE(s.details().empty());
+}
+
+// Client cancels request stream after sending two messages
+TEST_F(End2endTest, ClientCancelsRequestStream) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("hello");
+
+ auto stream = stub_->RequestStream(&context, &response);
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Write(request));
+
+ context.TryCancel();
+
+ Status s = stream->Finish();
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code());
+
+ EXPECT_EQ(response.message(), "");
+
+}
+
+// Client cancels server stream after sending some messages
+TEST_F(End2endTest, ClientCancelsResponseStream) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("hello");
+
+ auto stream = stub_->ResponseStream(&context, request);
+
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "0");
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "1");
+
+ context.TryCancel();
+
+ // The cancellation races with responses, so there might be zero or
+ // one responses pending, read till failure
+
+ if (stream->Read(&response)) {
+ EXPECT_EQ(response.message(), request.message() + "2");
+ // Since we have cancelled, we expect the next attempt to read to fail
+ EXPECT_FALSE(stream->Read(&response));
+ }
+
+ Status s = stream->Finish();
+ // The final status could be either of CANCELLED or OK depending on
+ // who won the race.
+ EXPECT_GE(grpc::StatusCode::CANCELLED, s.code());
+}
+
+// Client cancels bidi stream after sending some messages
+TEST_F(End2endTest, ClientCancelsBidi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ grpc::string msg("hello");
+
+ auto stream = stub_->BidiStream(&context);
+
+ request.set_message(msg + "0");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ request.set_message(msg + "1");
+ EXPECT_TRUE(stream->Write(request));
+
+ context.TryCancel();
+
+ // The cancellation races with responses, so there might be zero or
+ // one responses pending, read till failure
+
+ if (stream->Read(&response)) {
+ EXPECT_EQ(response.message(), request.message());
+ // Since we have cancelled, we expect the next attempt to read to fail
+ EXPECT_FALSE(stream->Read(&response));
+ }
+
+ Status s = stream->Finish();
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code());
+}
+
+
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
- grpc_init();
::testing::InitGoogleTest(&argc, argv);
- int result = RUN_ALL_TESTS();
- grpc_shutdown();
- google::protobuf::ShutdownProtobufLibrary();
- return result;
+ return RUN_ALL_TESTS();
}