diff options
Diffstat (limited to 'test/cpp/end2end/end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 168 |
1 files changed, 167 insertions, 1 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 58b20e61c9..5a414ebc86 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -54,7 +54,6 @@ #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/string_ref_helper.h" using grpc::testing::EchoRequest; @@ -65,6 +64,40 @@ namespace grpc { namespace testing { namespace { +const char* kServerCancelAfterReads = "cancel_after_reads"; + +// When echo_deadline is requested, deadline seen in the ServerContext is set in +// the response in seconds. +void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, + EchoResponse* response) { + if (request->has_param() && request->param().echo_deadline()) { + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + if (context->deadline() != system_clock::time_point::max()) { + Timepoint2Timespec(context->deadline(), &deadline); + } + response->mutable_param()->set_request_deadline(deadline.tv_sec); + } +} + +void CheckServerAuthContext(const ServerContext* context, + const grpc::string& expected_client_identity) { + std::shared_ptr<const AuthContext> auth_ctx = context->auth_context(); + std::vector<grpc::string_ref> ssl = + auth_ctx->FindPropertyValues("transport_security_type"); + EXPECT_EQ(1u, ssl.size()); + EXPECT_EQ("ssl", ToString(ssl[0])); + if (expected_client_identity.length() == 0) { + EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); + EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); + EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); + } else { + auto identity = auth_ctx->GetPeerIdentity(); + EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); + EXPECT_EQ(1u, identity.size()); + EXPECT_EQ(expected_client_identity, identity[0]); + } +} + bool CheckIsLocalhost(const grpc::string& addr) { const grpc::string kIpv6("ipv6:[::1]:"); const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:"); @@ -179,6 +212,138 @@ class Proxy : public ::grpc::testing::EchoTestService::Service { std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_; }; +class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { + public: + TestServiceImpl() : signal_client_(false), host_() {} + explicit TestServiceImpl(const grpc::string& host) + : signal_client_(false), host_(new grpc::string(host)) {} + + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) GRPC_OVERRIDE { + response->set_message(request->message()); + MaybeEchoDeadline(context, request, response); + if (host_) { + response->mutable_param()->set_host(*host_); + } + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock<std::mutex> lock(mu_); + signal_client_ = true; + } + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN))); + } + return Status::CANCELLED; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().server_cancel_after_us(), + GPR_TIMESPAN))); + return Status::CANCELLED; + } else if (!request->has_param() || + !request->param().skip_cancelled_check()) { + EXPECT_FALSE(context->IsCancelled()); + } + + if (request->has_param() && request->param().echo_metadata()) { + const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = + context->client_metadata(); + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = client_metadata.begin(); + iter != client_metadata.end(); ++iter) { + context->AddTrailingMetadata(ToString(iter->first), + ToString(iter->second)); + } + } + if (request->has_param() && + (request->param().expected_client_identity().length() > 0 || + request->param().check_auth_context())) { + CheckServerAuthContext(context, + request->param().expected_client_identity()); + } + if (request->has_param() && + request->param().response_message_length() > 0) { + response->set_message( + grpc::string(request->param().response_message_length(), '\0')); + } + if (request->has_param() && request->param().echo_peer()) { + response->mutable_param()->set_peer(context->peer()); + } + return Status::OK; + } + + // Unimplemented is left unimplemented to test the returned error. + + Status RequestStream(ServerContext* context, + ServerReader<EchoRequest>* reader, + EchoResponse* response) GRPC_OVERRIDE { + EchoRequest request; + response->set_message(""); + int cancel_after_reads = 0; + const std::multimap<grpc::string_ref, grpc::string_ref>& + client_initial_metadata = context->client_metadata(); + if (client_initial_metadata.find(kServerCancelAfterReads) != + client_initial_metadata.end()) { + std::istringstream iss(ToString( + client_initial_metadata.find(kServerCancelAfterReads)->second)); + iss >> cancel_after_reads; + gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); + } + while (reader->Read(&request)) { + if (cancel_after_reads == 1) { + gpr_log(GPR_INFO, "return cancel status"); + return Status::CANCELLED; + } else if (cancel_after_reads > 0) { + cancel_after_reads--; + } + response->mutable_message()->append(request.message()); + } + return Status::OK; + } + + // Return 3 messages. + // TODO(yangg) make it generic by adding a parameter into EchoRequest + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE { + EchoResponse response; + response.set_message(request->message() + "0"); + writer->Write(response); + response.set_message(request->message() + "1"); + writer->Write(response); + response.set_message(request->message() + "2"); + writer->Write(response); + + return Status::OK; + } + + Status BidiStream(ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) + GRPC_OVERRIDE { + EchoRequest request; + EchoResponse response; + while (stream->Read(&request)) { + gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); + response.set_message(request.message()); + stream->Write(response); + } + return Status::OK; + } + + bool signal_client() { + std::unique_lock<std::mutex> lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; + std::unique_ptr<grpc::string> host_; +}; + class TestServiceImplDupPkg : public ::grpc::testing::duplicate::EchoTestService::Service { public: @@ -659,6 +824,7 @@ TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { EchoRequest request; EchoResponse response; request.set_message("Hello"); + request.mutable_param()->set_skip_cancelled_check(true); ClientContext context; std::chrono::system_clock::time_point deadline = |