aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/test_service_impl.cc
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-02-03 07:38:36 -0800
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-02-03 07:41:31 -0800
commit9bc6fa0a40edb9014922e579346a6fce888303d2 (patch)
tree37a529410b8961b38974d41141eec7a933abb18d /test/cpp/end2end/test_service_impl.cc
parenta0a8eaab0e86148f40fb629a36c0529eea9e1b35 (diff)
parent6b4ec07ec9028e6c4727a3f1b83a166087d44f11 (diff)
Merge branch 'master' into server_try_cancel_api
Diffstat (limited to 'test/cpp/end2end/test_service_impl.cc')
-rw-r--r--test/cpp/end2end/test_service_impl.cc175
1 files changed, 154 insertions, 21 deletions
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index c9a32ecf5a..ac97cb435c 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -33,6 +33,8 @@
#include "test/cpp/end2end/test_service_impl.h"
+#include <thread>
+
#include <grpc++/security/credentials.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
@@ -82,6 +84,17 @@ void CheckServerAuthContext(const ServerContext* context,
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
+ int server_try_cancel = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ if (server_try_cancel > DO_NOT_CANCEL) {
+ // Since this is a unary RPC, by the time this server handler is called,
+ // the 'request' message is already read from the client. So the scenarios
+ // in server_try_cancel don't make much sense. Just cancel the RPC as long
+ // as server_try_cancel is not DO_NOT_CANCEL
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (host_) {
@@ -106,17 +119,17 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
gpr_time_from_micros(request->param().server_cancel_after_us(),
GPR_TIMESPAN)));
return Status::CANCELLED;
- } else {
+ } else if (!request->has_param() ||
+ !request->param().skip_cancelled_check()) {
EXPECT_FALSE(context->IsCancelled());
}
if (request->has_param() && request->param().echo_metadata()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();
- for (
- std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
- client_metadata.begin();
- iter != client_metadata.end(); ++iter) {
+ for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+ iter = client_metadata.begin();
+ iter != client_metadata.end(); ++iter) {
context->AddTrailingMetadata(ToString(iter->first),
ToString(iter->second));
}
@@ -142,18 +155,39 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
Status TestServiceImpl::RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) {
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
+ // any message from the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ int server_try_cancel = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ // If 'cancel_after_reads' is set in the metadata AND non-zero, the server
+ // will cancel the RPC (by just returning Status::CANCELLED - doesn't call
+ // ServerContext::TryCancel()) after reading the number of records specified
+ // by the 'cancel_after_reads' value set in the metadata.
+ int cancel_after_reads = GetIntValueFromMetadata(
+ kServerCancelAfterReads, context->client_metadata(), 0);
+
EchoRequest request;
response->set_message("");
- int cancel_after_reads = 0;
- const std::multimap<grpc::string_ref, grpc::string_ref>&
- client_initial_metadata = context->client_metadata();
- if (client_initial_metadata.find(kServerCancelAfterReads) !=
- client_initial_metadata.end()) {
- std::istringstream iss(ToString(
- client_initial_metadata.find(kServerCancelAfterReads)->second));
- iss >> cancel_after_reads;
- gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads);
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
+
+ int num_msgs_read = 0;
while (reader->Read(&request)) {
if (cancel_after_reads == 1) {
gpr_log(GPR_INFO, "return cancel status");
@@ -163,21 +197,65 @@ Status TestServiceImpl::RequestStream(ServerContext* context,
}
response->mutable_message()->append(request.message());
}
+ gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
return Status::OK;
}
-// Return 3 messages.
+// Return 'kNumResponseStreamMsgs' messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status TestServiceImpl::ResponseStream(ServerContext* context,
const EchoRequest* request,
ServerWriter<EchoResponse>* writer) {
+ // If server_try_cancel is set in the metadata, the RPC is cancelled by the
+ // server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
+ // any messages to the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // writing messages to the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
+ // all the messages to the client
+ int server_try_cancel = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
EchoResponse response;
- response.set_message(request->message() + "0");
- writer->Write(response);
- response.set_message(request->message() + "1");
- writer->Write(response);
- response.set_message(request->message() + "2");
- writer->Write(response);
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ }
+
+ for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
+ response.set_message(request->message() + std::to_string(i));
+ writer->Write(response);
+ }
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
return Status::OK;
}
@@ -185,15 +263,70 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
Status TestServiceImpl::BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
+ // If server_try_cancel is set in the metadata, the RPC is cancelled by the
+ // server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
+ // writes any messages from/to the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // reading/writing messages from/to the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
+ // reads/writes all messages from/to the client
+ int server_try_cancel = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
EchoRequest request;
EchoResponse response;
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ }
+
while (stream->Read(&request)) {
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
stream->Write(response);
}
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
return Status::OK;
}
+int TestServiceImpl::GetIntValueFromMetadata(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value) {
+ if (metadata.find(key) != metadata.end()) {
+ std::istringstream iss(ToString(metadata.find(key)->second));
+ iss >> default_value;
+ gpr_log(GPR_INFO, "%s : %d", key, default_value);
+ }
+
+ return default_value;
+}
+
+void TestServiceImpl::ServerTryCancel(ServerContext* context) {
+ EXPECT_FALSE(context->IsCancelled());
+ context->TryCancel();
+ gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
+ EXPECT_TRUE(context->IsCancelled());
+}
+
} // namespace testing
} // namespace grpc