diff options
author | Sree Kuchibhotla <sreek@google.com> | 2016-02-03 07:38:36 -0800 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2016-02-03 07:41:31 -0800 |
commit | 9bc6fa0a40edb9014922e579346a6fce888303d2 (patch) | |
tree | 37a529410b8961b38974d41141eec7a933abb18d /test/cpp/end2end/test_service_impl.cc | |
parent | a0a8eaab0e86148f40fb629a36c0529eea9e1b35 (diff) | |
parent | 6b4ec07ec9028e6c4727a3f1b83a166087d44f11 (diff) |
Merge branch 'master' into server_try_cancel_api
Diffstat (limited to 'test/cpp/end2end/test_service_impl.cc')
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 175 |
1 files changed, 154 insertions, 21 deletions
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index c9a32ecf5a..ac97cb435c 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -33,6 +33,8 @@ #include "test/cpp/end2end/test_service_impl.h" +#include <thread> + #include <grpc++/security/credentials.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> @@ -82,6 +84,17 @@ void CheckServerAuthContext(const ServerContext* context, Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) { + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + if (server_try_cancel > DO_NOT_CANCEL) { + // Since this is a unary RPC, by the time this server handler is called, + // the 'request' message is already read from the client. So the scenarios + // in server_try_cancel don't make much sense. Just cancel the RPC as long + // as server_try_cancel is not DO_NOT_CANCEL + ServerTryCancel(context); + return Status::CANCELLED; + } + response->set_message(request->message()); MaybeEchoDeadline(context, request, response); if (host_) { @@ -106,17 +119,17 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, gpr_time_from_micros(request->param().server_cancel_after_us(), GPR_TIMESPAN))); return Status::CANCELLED; - } else { + } else if (!request->has_param() || + !request->param().skip_cancelled_check()) { EXPECT_FALSE(context->IsCancelled()); } if (request->has_param() && request->param().echo_metadata()) { const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = context->client_metadata(); - for ( - std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter = - client_metadata.begin(); - iter != client_metadata.end(); ++iter) { + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = client_metadata.begin(); + iter != client_metadata.end(); ++iter) { context->AddTrailingMetadata(ToString(iter->first), ToString(iter->second)); } @@ -142,18 +155,39 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, Status TestServiceImpl::RequestStream(ServerContext* context, ServerReader<EchoRequest>* reader, EchoResponse* response) { + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads + // any message from the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + // If 'cancel_after_reads' is set in the metadata AND non-zero, the server + // will cancel the RPC (by just returning Status::CANCELLED - doesn't call + // ServerContext::TryCancel()) after reading the number of records specified + // by the 'cancel_after_reads' value set in the metadata. + int cancel_after_reads = GetIntValueFromMetadata( + kServerCancelAfterReads, context->client_metadata(), 0); + EchoRequest request; response->set_message(""); - int cancel_after_reads = 0; - const std::multimap<grpc::string_ref, grpc::string_ref>& - client_initial_metadata = context->client_metadata(); - if (client_initial_metadata.find(kServerCancelAfterReads) != - client_initial_metadata.end()) { - std::istringstream iss(ToString( - client_initial_metadata.find(kServerCancelAfterReads)->second)); - iss >> cancel_after_reads; - gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } + + int num_msgs_read = 0; while (reader->Read(&request)) { if (cancel_after_reads == 1) { gpr_log(GPR_INFO, "return cancel status"); @@ -163,21 +197,65 @@ Status TestServiceImpl::RequestStream(ServerContext* context, } response->mutable_message()->append(request.message()); } + gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + return Status::OK; } -// Return 3 messages. +// Return 'kNumResponseStreamMsgs' messages. // TODO(yangg) make it generic by adding a parameter into EchoRequest Status TestServiceImpl::ResponseStream(ServerContext* context, const EchoRequest* request, ServerWriter<EchoResponse>* writer) { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes + // any messages to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // writing messages to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes + // all the messages to the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + EchoResponse response; - response.set_message(request->message() + "0"); - writer->Write(response); - response.set_message(request->message() + "1"); - writer->Write(response); - response.set_message(request->message() + "2"); - writer->Write(response); + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + } + + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + response.set_message(request->message() + std::to_string(i)); + writer->Write(response); + } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } return Status::OK; } @@ -185,15 +263,70 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, Status TestServiceImpl::BidiStream( ServerContext* context, ServerReaderWriter<EchoResponse, EchoRequest>* stream) { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ + // writes any messages from/to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading/writing messages from/to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server + // reads/writes all messages from/to the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + EchoRequest request; EchoResponse response; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + } + while (stream->Read(&request)) { gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); response.set_message(request.message()); stream->Write(response); } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + return Status::OK; } +int TestServiceImpl::GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + if (metadata.find(key) != metadata.end()) { + std::istringstream iss(ToString(metadata.find(key)->second)); + iss >> default_value; + gpr_log(GPR_INFO, "%s : %d", key, default_value); + } + + return default_value; +} + +void TestServiceImpl::ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + EXPECT_TRUE(context->IsCancelled()); +} + } // namespace testing } // namespace grpc |