diff options
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 307 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 175 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.h | 17 |
3 files changed, 172 insertions, 327 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 45a57620c1..65da71b391 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -54,6 +54,7 @@ #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/string_ref_helper.h" using grpc::testing::EchoRequest; @@ -64,48 +65,6 @@ namespace grpc { namespace testing { namespace { -const char* kServerCancelAfterReads = "cancel_after_reads"; -const char* kServerTryCancelRequest = "server_try_cancel"; -typedef enum { - DO_NOT_CANCEL = 0, - CANCEL_BEFORE_PROCESSING, - CANCEL_DURING_PROCESSING, - CANCEL_AFTER_PROCESSING -} ServerTryCancelRequestPhase; -const int kNumResponseStreamsMsgs = 3; - -// When echo_deadline is requested, deadline seen in the ServerContext is set in -// the response in seconds. -void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, - EchoResponse* response) { - if (request->has_param() && request->param().echo_deadline()) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - if (context->deadline() != system_clock::time_point::max()) { - Timepoint2Timespec(context->deadline(), &deadline); - } - response->mutable_param()->set_request_deadline(deadline.tv_sec); - } -} - -void CheckServerAuthContext(const ServerContext* context, - const grpc::string& expected_client_identity) { - std::shared_ptr<const AuthContext> auth_ctx = context->auth_context(); - std::vector<grpc::string_ref> ssl = - auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, ssl.size()); - EXPECT_EQ("ssl", ToString(ssl[0])); - if (expected_client_identity.length() == 0) { - EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); - EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); - EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); - } else { - auto identity = auth_ctx->GetPeerIdentity(); - EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); - EXPECT_EQ(1u, identity.size()); - EXPECT_EQ(expected_client_identity, identity[0]); - } -} - bool CheckIsLocalhost(const grpc::string& addr) { const grpc::string kIpv6("ipv6:[::1]:"); const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:"); @@ -220,270 +179,6 @@ class Proxy : public ::grpc::testing::EchoTestService::Service { std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_; }; -class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { - public: - TestServiceImpl() : signal_client_(false), host_() {} - explicit TestServiceImpl(const grpc::string& host) - : signal_client_(false), host_(new grpc::string(host)) {} - - int GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value) { - if (metadata.find(key) != metadata.end()) { - std::istringstream iss(ToString(metadata.find(key)->second)); - iss >> default_value; - gpr_log(GPR_INFO, "%s : %d", key, default_value); - } - - return default_value; - } - - void ServerTryCancel(ServerContext* context) { - EXPECT_FALSE(context->IsCancelled()); - context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); - EXPECT_TRUE(context->IsCancelled()); - } - - Status Echo(ServerContext* context, const EchoRequest* request, - EchoResponse* response) GRPC_OVERRIDE { - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - if (server_try_cancel > DO_NOT_CANCEL) { - // Since this is a unary RPC, by the time this server handler is called, - // the 'request' message is already read from the client. So the scenarios - // in server_try_cancel don't make much sense. Just cancel the RPC as long - // as server_try_cancel is not DO_NOT_CANCEL - ServerTryCancel(context); - return Status::CANCELLED; - } - - response->set_message(request->message()); - MaybeEchoDeadline(context, request, response); - if (host_) { - response->mutable_param()->set_host(*host_); - } - if (request->has_param() && request->param().client_cancel_after_us()) { - { - std::unique_lock<std::mutex> lock(mu_); - signal_client_ = true; - } - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().client_cancel_after_us(), - GPR_TIMESPAN))); - } - return Status::CANCELLED; - } else if (request->has_param() && - request->param().server_cancel_after_us()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().server_cancel_after_us(), - GPR_TIMESPAN))); - return Status::CANCELLED; - } else if (!request->has_param() || - !request->param().skip_cancelled_check()) { - EXPECT_FALSE(context->IsCancelled()); - } - - if (request->has_param() && request->param().echo_metadata()) { - const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = - context->client_metadata(); - for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator - iter = client_metadata.begin(); - iter != client_metadata.end(); ++iter) { - context->AddTrailingMetadata(ToString(iter->first), - ToString(iter->second)); - } - } - if (request->has_param() && - (request->param().expected_client_identity().length() > 0 || - request->param().check_auth_context())) { - CheckServerAuthContext(context, - request->param().expected_client_identity()); - } - if (request->has_param() && - request->param().response_message_length() > 0) { - response->set_message( - grpc::string(request->param().response_message_length(), '\0')); - } - if (request->has_param() && request->param().echo_peer()) { - response->mutable_param()->set_peer(context->peer()); - } - return Status::OK; - } - - // Unimplemented is left unimplemented to test the returned error. - - Status RequestStream(ServerContext* context, - ServerReader<EchoRequest>* reader, - EchoResponse* response) GRPC_OVERRIDE { - // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by - // the server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads - // any message from the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // reading messages from the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads - // all the messages from the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - // If 'cancel_after_reads' is set in the metadata AND non-zero, the server - // will cancel the RPC (by just returning Status::CANCELLED - doesn't call - // ServerContext::TryCancel()) after reading the number of records specified - // by the 'cancel_after_reads' value set in the metadata. - int cancel_after_reads = GetIntValueFromMetadata( - kServerCancelAfterReads, context->client_metadata(), 0); - - EchoRequest request; - response->set_message(""); - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - std::thread* server_try_cancel_thd = NULL; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); - } - - int num_msgs_read = 0; - while (reader->Read(&request)) { - num_msgs_read++; - if (cancel_after_reads == 1) { - gpr_log(GPR_INFO, "return cancel status"); - return Status::CANCELLED; - } else if (cancel_after_reads > 0) { - cancel_after_reads--; - } - response->mutable_message()->append(request.message()); - } - gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); - - if (server_try_cancel_thd != NULL) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; - } - - // Return 'kNumResponseStreamMsgs' messages. - // TODO(yangg) make it generic by adding a parameter into EchoRequest - Status ResponseStream(ServerContext* context, const EchoRequest* request, - ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE { - // If server_try_cancel is set in the metadata, the RPC is cancelled by the - // server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes - // any messages to the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // writing messages to the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes - // all the messages to the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - EchoResponse response; - std::thread* server_try_cancel_thd = NULL; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); - } - - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { - response.set_message(request->message() + std::to_string(i)); - writer->Write(response); - } - - if (server_try_cancel_thd != NULL) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; - } - - Status BidiStream(ServerContext* context, - ServerReaderWriter<EchoResponse, EchoRequest>* stream) - GRPC_OVERRIDE { - // If server_try_cancel is set in the metadata, the RPC is cancelled by the - // server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ - // writes any messages from/to the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // reading/writing messages from/to the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server - // reads/writes all messages from/to the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - EchoRequest request; - EchoResponse response; - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - std::thread* server_try_cancel_thd = NULL; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); - } - - while (stream->Read(&request)) { - gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); - response.set_message(request.message()); - stream->Write(response); - } - - if (server_try_cancel_thd != NULL) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; - } - - bool signal_client() { - std::unique_lock<std::mutex> lock(mu_); - return signal_client_; - } - - private: - bool signal_client_; - std::mutex mu_; - std::unique_ptr<grpc::string> host_; -}; - class TestServiceImplDupPkg : public ::grpc::testing::duplicate::EchoTestService::Service { public: diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index c9a32ecf5a..ac97cb435c 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -33,6 +33,8 @@ #include "test/cpp/end2end/test_service_impl.h" +#include <thread> + #include <grpc++/security/credentials.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> @@ -82,6 +84,17 @@ void CheckServerAuthContext(const ServerContext* context, Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) { + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + if (server_try_cancel > DO_NOT_CANCEL) { + // Since this is a unary RPC, by the time this server handler is called, + // the 'request' message is already read from the client. So the scenarios + // in server_try_cancel don't make much sense. Just cancel the RPC as long + // as server_try_cancel is not DO_NOT_CANCEL + ServerTryCancel(context); + return Status::CANCELLED; + } + response->set_message(request->message()); MaybeEchoDeadline(context, request, response); if (host_) { @@ -106,17 +119,17 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, gpr_time_from_micros(request->param().server_cancel_after_us(), GPR_TIMESPAN))); return Status::CANCELLED; - } else { + } else if (!request->has_param() || + !request->param().skip_cancelled_check()) { EXPECT_FALSE(context->IsCancelled()); } if (request->has_param() && request->param().echo_metadata()) { const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = context->client_metadata(); - for ( - std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter = - client_metadata.begin(); - iter != client_metadata.end(); ++iter) { + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = client_metadata.begin(); + iter != client_metadata.end(); ++iter) { context->AddTrailingMetadata(ToString(iter->first), ToString(iter->second)); } @@ -142,18 +155,39 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, Status TestServiceImpl::RequestStream(ServerContext* context, ServerReader<EchoRequest>* reader, EchoResponse* response) { + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads + // any message from the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + // If 'cancel_after_reads' is set in the metadata AND non-zero, the server + // will cancel the RPC (by just returning Status::CANCELLED - doesn't call + // ServerContext::TryCancel()) after reading the number of records specified + // by the 'cancel_after_reads' value set in the metadata. + int cancel_after_reads = GetIntValueFromMetadata( + kServerCancelAfterReads, context->client_metadata(), 0); + EchoRequest request; response->set_message(""); - int cancel_after_reads = 0; - const std::multimap<grpc::string_ref, grpc::string_ref>& - client_initial_metadata = context->client_metadata(); - if (client_initial_metadata.find(kServerCancelAfterReads) != - client_initial_metadata.end()) { - std::istringstream iss(ToString( - client_initial_metadata.find(kServerCancelAfterReads)->second)); - iss >> cancel_after_reads; - gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } + + int num_msgs_read = 0; while (reader->Read(&request)) { if (cancel_after_reads == 1) { gpr_log(GPR_INFO, "return cancel status"); @@ -163,21 +197,65 @@ Status TestServiceImpl::RequestStream(ServerContext* context, } response->mutable_message()->append(request.message()); } + gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + return Status::OK; } -// Return 3 messages. +// Return 'kNumResponseStreamMsgs' messages. // TODO(yangg) make it generic by adding a parameter into EchoRequest Status TestServiceImpl::ResponseStream(ServerContext* context, const EchoRequest* request, ServerWriter<EchoResponse>* writer) { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes + // any messages to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // writing messages to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes + // all the messages to the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + EchoResponse response; - response.set_message(request->message() + "0"); - writer->Write(response); - response.set_message(request->message() + "1"); - writer->Write(response); - response.set_message(request->message() + "2"); - writer->Write(response); + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + } + + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + response.set_message(request->message() + std::to_string(i)); + writer->Write(response); + } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } return Status::OK; } @@ -185,15 +263,70 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, Status TestServiceImpl::BidiStream( ServerContext* context, ServerReaderWriter<EchoResponse, EchoRequest>* stream) { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ + // writes any messages from/to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading/writing messages from/to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server + // reads/writes all messages from/to the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + EchoRequest request; EchoResponse response; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + } + while (stream->Read(&request)) { gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); response.set_message(request.message()); stream->Write(response); } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + return Status::OK; } +int TestServiceImpl::GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + if (metadata.find(key) != metadata.end()) { + std::istringstream iss(ToString(metadata.find(key)->second)); + iss >> default_value; + gpr_log(GPR_INFO, "%s : %d", key, default_value); + } + + return default_value; +} + +void TestServiceImpl::ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + EXPECT_TRUE(context->IsCancelled()); +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 2c35b5614c..1ab6ced9e0 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -44,7 +44,16 @@ namespace grpc { namespace testing { +const int kNumResponseStreamsMsgs = 3; const char* const kServerCancelAfterReads = "cancel_after_reads"; +const char* const kServerTryCancelRequest = "server_try_cancel"; + +typedef enum { + DO_NOT_CANCEL = 0, + CANCEL_BEFORE_PROCESSING, + CANCEL_DURING_PROCESSING, + CANCEL_AFTER_PROCESSING +} ServerTryCancelRequestPhase; class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: @@ -74,6 +83,14 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { } private: + int GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value); + + void ServerTryCancel(ServerContext* context); + + private: bool signal_client_; std::mutex mu_; std::unique_ptr<grpc::string> host_; |