aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2016-02-04 11:50:45 -0800
committerGravatar vjpai <vpai@google.com>2016-02-04 11:50:45 -0800
commited53c082385a208576d5fe499285d7889652e853 (patch)
tree3159a55631209bd0d1d4b5cb2bef2c7fe3830fce /test/cpp
parent2066443b7c5815b4cc2b2ca7a41f78f5d2bfc0c2 (diff)
parent3ad28d0f1ce2274dad4e28827b5bc5456f9e1fc6 (diff)
Merge branch 'master' into worker_quit
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc410
-rw-r--r--test/cpp/end2end/end2end_test.cc465
-rw-r--r--test/cpp/end2end/hybrid_end2end_test.cc2
-rw-r--r--test/cpp/end2end/test_service_impl.cc168
-rw-r--r--test/cpp/end2end/test_service_impl.h17
-rw-r--r--test/cpp/end2end/thread_stress_test.cc139
-rw-r--r--test/cpp/qps/limit_cores.cc7
-rw-r--r--test/cpp/qps/limit_cores.h2
8 files changed, 991 insertions, 219 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 0616cc07ee..252bda3798 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -32,6 +32,7 @@
*/
#include <memory>
+#include <thread>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
@@ -104,7 +105,10 @@ class Verifier : public PollingCheckRegion {
expectations_[tag(i)] = expect_ok;
return *this;
}
- void Verify(CompletionQueue* cq) {
+
+ void Verify(CompletionQueue* cq) { Verify(cq, false); }
+
+ void Verify(CompletionQueue* cq, bool ignore_ok) {
GPR_ASSERT(!expectations_.empty());
while (!expectations_.empty()) {
bool ok;
@@ -122,7 +126,9 @@ class Verifier : public PollingCheckRegion {
}
auto it = expectations_.find(got_tag);
EXPECT_TRUE(it != expectations_.end());
- EXPECT_EQ(it->second, ok);
+ if (!ignore_ok) {
+ EXPECT_EQ(it->second, ok);
+ }
expectations_.erase(it);
}
}
@@ -217,7 +223,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@@ -270,7 +276,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
std::chrono::system_clock::time_point time_now(
@@ -315,7 +321,7 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message("Hello");
- std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream(
+ std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
@@ -368,7 +374,7 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
send_request.set_message("Hello");
- std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream(
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
@@ -418,7 +424,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message("Hello");
- std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
@@ -476,7 +482,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@@ -519,7 +525,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@@ -568,7 +574,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@@ -629,7 +635,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@@ -690,7 +696,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
@@ -725,7 +731,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
@@ -759,7 +765,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
ClientContext cli_ctx;
send_request.set_message("Hello");
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
@@ -769,8 +775,384 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
EXPECT_EQ("", recv_status.error_message());
}
+// This class is for testing scenarios where RPCs are cancelled on the server
+// by calling ServerContext::TryCancel()
+class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
+ protected:
+ typedef enum {
+ DO_NOT_CANCEL = 0,
+ CANCEL_BEFORE_PROCESSING,
+ CANCEL_DURING_PROCESSING,
+ CANCEL_AFTER_PROCESSING
+ } ServerTryCancelRequestPhase;
+
+ void ServerTryCancel(ServerContext* context) {
+ EXPECT_FALSE(context->IsCancelled());
+ context->TryCancel();
+ gpr_log(GPR_INFO, "Server called TryCancel()");
+ EXPECT_TRUE(context->IsCancelled());
+ }
+
+ // Helper for testing client-streaming RPCs which are cancelled on the server.
+ // Depending on the value of server_try_cancel parameter, this will test one
+ // of the following three scenarios:
+ // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
+ // any messages from the client
+ //
+ // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
+ // messages from the client
+ //
+ // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
+ // messages from the client (but before sending any status back to the
+ // client)
+ void TestClientStreamingServerCancel(
+ ServerTryCancelRequestPhase server_try_cancel) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ // Initiate the 'RequestStream' call on client
+ std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
+ stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
+ Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+
+ // On the server, request to be notified of 'RequestStream' calls
+ // and receive the 'RequestStream' call just made by the client
+ service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+
+ // Client sends 3 messages (tags 3, 4 and 5)
+ for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
+ send_request.set_message("Ping " + std::to_string(tag_idx));
+ cli_stream->Write(send_request, tag(tag_idx));
+ Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
+ }
+ cli_stream->WritesDone(tag(6));
+ Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+
+ bool expected_server_cq_result = true;
+ bool ignore_cq_result = false;
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(&srv_ctx);
+
+ // Since cancellation is done before server reads any results, we know
+ // for sure that all cq results will return false from this point forward
+ expected_server_cq_result = false;
+ }
+
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd = new std::thread(
+ &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+ // Server will cancel the RPC in a parallel thread while reading the
+ // requests from the client. Since the cancellation can happen at anytime,
+ // some of the cq results (i.e those until cancellation) might be true but
+ // its non deterministic. So better to ignore the cq results
+ ignore_cq_result = true;
+ }
+
+ // Server reads 3 messages (tags 6, 7 and 8)
+ for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
+ srv_stream.Read(&recv_request, tag(tag_idx));
+ Verifier(GetParam())
+ .Expect(tag_idx, expected_server_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
+ }
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(&srv_ctx);
+ }
+
+ // The RPC has been cancelled at this point for sure (i.e irrespective of
+ // the value of `server_try_cancel` is). So, from this point forward, we
+ // know that cq results are supposed to return false on server.
+
+ // Server sends the final message and cancelled status (but the RPC is
+ // already cancelled at this point. So we expect the operation to fail)
+ srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
+ Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+
+ // Client will see the cancellation
+ cli_stream->Finish(&recv_status, tag(10));
+ // TODO(sreek): The expectation here should be true. This is a bug (github
+ // issue #4972)
+ Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
+ EXPECT_FALSE(recv_status.ok());
+ EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
+ }
+
+ // Helper for testing server-streaming RPCs which are cancelled on the server.
+ // Depending on the value of server_try_cancel parameter, this will test one
+ // of the following three scenarios:
+ // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
+ // any messages to the client
+ //
+ // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
+ // messages to the client
+ //
+ // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
+ // messages to the client (but before sending any status back to the
+ // client)
+ void TestServerStreamingServerCancel(
+ ServerTryCancelRequestPhase server_try_cancel) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message("Ping");
+ // Initiate the 'ResponseStream' call on the client
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+ Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ // On the server, request to be notified of 'ResponseStream' calls and
+ // receive the call just made by the client
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ bool expected_cq_result = true;
+ bool ignore_cq_result = false;
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(&srv_ctx);
+
+ // We know for sure that all cq results will be false from this point
+ // since the server cancelled the RPC
+ expected_cq_result = false;
+ }
+
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd = new std::thread(
+ &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+
+ // Server will cancel the RPC in a parallel thread while writing responses
+ // to the client. Since the cancellation can happen at anytime, some of
+ // the cq results (i.e those until cancellation) might be true but it is
+ // non deterministic. So better to ignore the cq results
+ ignore_cq_result = true;
+ }
+
+ // Server sends three messages (tags 3, 4 and 5)
+ for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
+ send_response.set_message("Pong " + std::to_string(tag_idx));
+ srv_stream.Write(send_response, tag(tag_idx));
+ Verifier(GetParam())
+ .Expect(tag_idx, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
+ }
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(&srv_ctx);
+ }
+
+ // Client attemts to read the three messages from the server
+ for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
+ cli_stream->Read(&recv_response, tag(tag_idx));
+ Verifier(GetParam())
+ .Expect(tag_idx, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
+ }
+
+ // The RPC has been cancelled at this point for sure (i.e irrespective of
+ // the value of `server_try_cancel` is). So, from this point forward, we
+ // know that cq results are supposed to return false on server.
+
+ // Server finishes the stream (but the RPC is already cancelled)
+ srv_stream.Finish(Status::CANCELLED, tag(9));
+ Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+
+ // Client will see the cancellation
+ cli_stream->Finish(&recv_status, tag(10));
+ Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ EXPECT_FALSE(recv_status.ok());
+ EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
+ }
+
+ // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
+ // server.
+ //
+ // Depending on the value of server_try_cancel parameter, this will
+ // test one of the following three scenarios:
+ // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
+ // writing any messages from/to the client
+ //
+ // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
+ // messages from the client
+ //
+ // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
+ // messages from the client (but before sending any status back to the
+ // client)
+ void TestBidiStreamingServerCancel(
+ ServerTryCancelRequestPhase server_try_cancel) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ // Initiate the call from the client side
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+ Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+
+ // On the server, request to be notified of the 'BidiStream' call and
+ // receive the call just made by the client
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+
+ // Client sends the first and the only message
+ send_request.set_message("Ping");
+ cli_stream->Write(send_request, tag(3));
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+
+ bool expected_cq_result = true;
+ bool ignore_cq_result = false;
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(&srv_ctx);
+
+ // We know for sure that all cq results will be false from this point
+ // since the server cancelled the RPC
+ expected_cq_result = false;
+ }
+
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd = new std::thread(
+ &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+
+ // Since server is going to cancel the RPC in a parallel thread, some of
+ // the cq results (i.e those until the cancellation) might be true. Since
+ // that number is non-deterministic, it is better to ignore the cq results
+ ignore_cq_result = true;
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+ Verifier(GetParam())
+ .Expect(4, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
+
+ send_response.set_message("Pong");
+ srv_stream.Write(send_response, tag(5));
+ Verifier(GetParam())
+ .Expect(5, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
+
+ cli_stream->Read(&recv_response, tag(6));
+ Verifier(GetParam())
+ .Expect(6, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
+
+ // This is expected to succeed in all cases
+ cli_stream->WritesDone(tag(7));
+ Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
+
+ // This is expected to fail in all cases i.e for all values of
+ // server_try_cancel. This is becasue at this point, either there are no
+ // more msgs from the client (because client called WritesDone) or the RPC
+ // is cancelled on the server
+ srv_stream.Read(&recv_request, tag(8));
+ Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(&srv_ctx);
+ }
+
+ // The RPC has been cancelled at this point for sure (i.e irrespective of
+ // the value of `server_try_cancel` is). So, from this point forward, we
+ // know that cq results are supposed to return false on server.
+
+ srv_stream.Finish(Status::CANCELLED, tag(9));
+ Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(10));
+ Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ EXPECT_FALSE(recv_status.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
+ }
+};
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
+ TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
+}
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
+ TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
+}
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
+ TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
+}
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
+ TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
+}
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
+ TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
+}
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
+ TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
+}
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
+ TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
+}
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
+ TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
+}
+
+TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
+ TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
+}
+
INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
::testing::Values(false, true));
+INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
+ AsyncEnd2endServerTryCancelTest,
+ ::testing::Values(false));
} // namespace
} // namespace testing
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 3ad09aca4c..65da71b391 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -54,6 +54,7 @@
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/string_ref_helper.h"
using grpc::testing::EchoRequest;
@@ -64,40 +65,6 @@ namespace grpc {
namespace testing {
namespace {
-const char* kServerCancelAfterReads = "cancel_after_reads";
-
-// When echo_deadline is requested, deadline seen in the ServerContext is set in
-// the response in seconds.
-void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
- EchoResponse* response) {
- if (request->has_param() && request->param().echo_deadline()) {
- gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- if (context->deadline() != system_clock::time_point::max()) {
- Timepoint2Timespec(context->deadline(), &deadline);
- }
- response->mutable_param()->set_request_deadline(deadline.tv_sec);
- }
-}
-
-void CheckServerAuthContext(const ServerContext* context,
- const grpc::string& expected_client_identity) {
- std::shared_ptr<const AuthContext> auth_ctx = context->auth_context();
- std::vector<grpc::string_ref> ssl =
- auth_ctx->FindPropertyValues("transport_security_type");
- EXPECT_EQ(1u, ssl.size());
- EXPECT_EQ("ssl", ToString(ssl[0]));
- if (expected_client_identity.length() == 0) {
- EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
- EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty());
- EXPECT_FALSE(auth_ctx->IsPeerAuthenticated());
- } else {
- auto identity = auth_ctx->GetPeerIdentity();
- EXPECT_TRUE(auth_ctx->IsPeerAuthenticated());
- EXPECT_EQ(1u, identity.size());
- EXPECT_EQ(expected_client_identity, identity[0]);
- }
-}
-
bool CheckIsLocalhost(const grpc::string& addr) {
const grpc::string kIpv6("ipv6:[::1]:");
const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:");
@@ -212,138 +179,6 @@ class Proxy : public ::grpc::testing::EchoTestService::Service {
std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_;
};
-class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
- public:
- TestServiceImpl() : signal_client_(false), host_() {}
- explicit TestServiceImpl(const grpc::string& host)
- : signal_client_(false), host_(new grpc::string(host)) {}
-
- Status Echo(ServerContext* context, const EchoRequest* request,
- EchoResponse* response) GRPC_OVERRIDE {
- response->set_message(request->message());
- MaybeEchoDeadline(context, request, response);
- if (host_) {
- response->mutable_param()->set_host(*host_);
- }
- if (request->has_param() && request->param().client_cancel_after_us()) {
- {
- std::unique_lock<std::mutex> lock(mu_);
- signal_client_ = true;
- }
- while (!context->IsCancelled()) {
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(request->param().client_cancel_after_us(),
- GPR_TIMESPAN)));
- }
- return Status::CANCELLED;
- } else if (request->has_param() &&
- request->param().server_cancel_after_us()) {
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(request->param().server_cancel_after_us(),
- GPR_TIMESPAN)));
- return Status::CANCELLED;
- } else if (!request->has_param() ||
- !request->param().skip_cancelled_check()) {
- EXPECT_FALSE(context->IsCancelled());
- }
-
- if (request->has_param() && request->param().echo_metadata()) {
- const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
- context->client_metadata();
- for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
- iter = client_metadata.begin();
- iter != client_metadata.end(); ++iter) {
- context->AddTrailingMetadata(ToString(iter->first),
- ToString(iter->second));
- }
- }
- if (request->has_param() &&
- (request->param().expected_client_identity().length() > 0 ||
- request->param().check_auth_context())) {
- CheckServerAuthContext(context,
- request->param().expected_client_identity());
- }
- if (request->has_param() &&
- request->param().response_message_length() > 0) {
- response->set_message(
- grpc::string(request->param().response_message_length(), '\0'));
- }
- if (request->has_param() && request->param().echo_peer()) {
- response->mutable_param()->set_peer(context->peer());
- }
- return Status::OK;
- }
-
- // Unimplemented is left unimplemented to test the returned error.
-
- Status RequestStream(ServerContext* context,
- ServerReader<EchoRequest>* reader,
- EchoResponse* response) GRPC_OVERRIDE {
- EchoRequest request;
- response->set_message("");
- int cancel_after_reads = 0;
- const std::multimap<grpc::string_ref, grpc::string_ref>&
- client_initial_metadata = context->client_metadata();
- if (client_initial_metadata.find(kServerCancelAfterReads) !=
- client_initial_metadata.end()) {
- std::istringstream iss(ToString(
- client_initial_metadata.find(kServerCancelAfterReads)->second));
- iss >> cancel_after_reads;
- gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads);
- }
- while (reader->Read(&request)) {
- if (cancel_after_reads == 1) {
- gpr_log(GPR_INFO, "return cancel status");
- return Status::CANCELLED;
- } else if (cancel_after_reads > 0) {
- cancel_after_reads--;
- }
- response->mutable_message()->append(request.message());
- }
- return Status::OK;
- }
-
- // Return 3 messages.
- // TODO(yangg) make it generic by adding a parameter into EchoRequest
- Status ResponseStream(ServerContext* context, const EchoRequest* request,
- ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE {
- EchoResponse response;
- response.set_message(request->message() + "0");
- writer->Write(response);
- response.set_message(request->message() + "1");
- writer->Write(response);
- response.set_message(request->message() + "2");
- writer->Write(response);
-
- return Status::OK;
- }
-
- Status BidiStream(ServerContext* context,
- ServerReaderWriter<EchoResponse, EchoRequest>* stream)
- GRPC_OVERRIDE {
- EchoRequest request;
- EchoResponse response;
- while (stream->Read(&request)) {
- gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
- response.set_message(request.message());
- stream->Write(response);
- }
- return Status::OK;
- }
-
- bool signal_client() {
- std::unique_lock<std::mutex> lock(mu_);
- return signal_client_;
- }
-
- private:
- bool signal_client_;
- std::mutex mu_;
- std::unique_ptr<grpc::string> host_;
-};
-
class TestServiceImplDupPkg
: public ::grpc::testing::duplicate::EchoTestService::Service {
public:
@@ -471,6 +306,301 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
}
}
+// This class is for testing scenarios where RPCs are cancelled on the server
+// by calling ServerContext::TryCancel()
+class End2endServerTryCancelTest : public End2endTest {
+ protected:
+ // Helper for testing client-streaming RPCs which are cancelled on the server.
+ // Depending on the value of server_try_cancel parameter, this will test one
+ // of the following three scenarios:
+ // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
+ // any messages from the client
+ //
+ // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
+ // messages from the client
+ //
+ // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
+ // the messages from the client
+ //
+ // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
+ void TestRequestStreamServerCancel(
+ ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ // Send server_try_cancel value in the client metadata
+ context.AddMetadata(kServerTryCancelRequest,
+ std::to_string(server_try_cancel));
+
+ auto stream = stub_->RequestStream(&context, &response);
+
+ int num_msgs_sent = 0;
+ while (num_msgs_sent < num_msgs_to_send) {
+ request.set_message("hello");
+ if (!stream->Write(request)) {
+ break;
+ }
+ num_msgs_sent++;
+ }
+ gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
+
+ stream->WritesDone();
+ Status s = stream->Finish();
+
+ // At this point, we know for sure that RPC was cancelled by the server
+ // since we passed server_try_cancel value in the metadata. Depending on the
+ // value of server_try_cancel, the RPC might have been cancelled by the
+ // server at different stages. The following validates our expectations of
+ // number of messages sent in various cancellation scenarios:
+
+ switch (server_try_cancel) {
+ case CANCEL_BEFORE_PROCESSING:
+ case CANCEL_DURING_PROCESSING:
+ // If the RPC is cancelled by server before / during messages from the
+ // client, it means that the client most likely did not get a chance to
+ // send all the messages it wanted to send. i.e num_msgs_sent <=
+ // num_msgs_to_send
+ EXPECT_LE(num_msgs_sent, num_msgs_to_send);
+ break;
+
+ case CANCEL_AFTER_PROCESSING:
+ // If the RPC was cancelled after all messages were read by the server,
+ // the client did get a chance to send all its messages
+ EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
+ break;
+
+ default:
+ gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
+ server_try_cancel);
+ EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
+ server_try_cancel <= CANCEL_AFTER_PROCESSING);
+ break;
+ }
+
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ }
+
+ // Helper for testing server-streaming RPCs which are cancelled on the server.
+ // Depending on the value of server_try_cancel parameter, this will test one
+ // of the following three scenarios:
+ // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
+ // any messages to the client
+ //
+ // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
+ // messages to the client
+ //
+ // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
+ // the messages to the client
+ //
+ // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
+ void TestResponseStreamServerCancel(
+ ServerTryCancelRequestPhase server_try_cancel) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ // Send server_try_cancel in the client metadata
+ context.AddMetadata(kServerTryCancelRequest,
+ std::to_string(server_try_cancel));
+
+ request.set_message("hello");
+ auto stream = stub_->ResponseStream(&context, request);
+
+ int num_msgs_read = 0;
+ while (num_msgs_read < kNumResponseStreamsMsgs) {
+ if (!stream->Read(&response)) {
+ break;
+ }
+ EXPECT_EQ(response.message(),
+ request.message() + std::to_string(num_msgs_read));
+ num_msgs_read++;
+ }
+ gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
+
+ Status s = stream->Finish();
+
+ // Depending on the value of server_try_cancel, the RPC might have been
+ // cancelled by the server at different stages. The following validates our
+ // expectations of number of messages read in various cancellation
+ // scenarios:
+ switch (server_try_cancel) {
+ case CANCEL_BEFORE_PROCESSING:
+ // Server cancelled before sending any messages. Which means the client
+ // wouldn't have read any
+ EXPECT_EQ(num_msgs_read, 0);
+ break;
+
+ case CANCEL_DURING_PROCESSING:
+ // Server cancelled while writing messages. Client must have read less
+ // than or equal to the expected number of messages
+ EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs);
+ break;
+
+ case CANCEL_AFTER_PROCESSING:
+ // Server cancelled after writing all messages. Client must have read
+ // all messages
+ EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs);
+ break;
+
+ default: {
+ gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
+ server_try_cancel);
+ EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
+ server_try_cancel <= CANCEL_AFTER_PROCESSING);
+ break;
+ }
+ }
+
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ }
+
+ // Helper for testing bidirectional-streaming RPCs which are cancelled on the
+ // server. Depending on the value of server_try_cancel parameter, this will
+ // test one of the following three scenarios:
+ // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
+ // writing any messages from/to the client
+ //
+ // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
+ // writing messages from/to the client
+ //
+ // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
+ // all the messages from/to the client
+ //
+ // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
+ void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
+ int num_messages) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ // Send server_try_cancel in the client metadata
+ context.AddMetadata(kServerTryCancelRequest,
+ std::to_string(server_try_cancel));
+
+ auto stream = stub_->BidiStream(&context);
+
+ int num_msgs_read = 0;
+ int num_msgs_sent = 0;
+ while (num_msgs_sent < num_messages) {
+ request.set_message("hello " + std::to_string(num_msgs_sent));
+ if (!stream->Write(request)) {
+ break;
+ }
+ num_msgs_sent++;
+
+ if (!stream->Read(&response)) {
+ break;
+ }
+ num_msgs_read++;
+
+ EXPECT_EQ(response.message(), request.message());
+ }
+ gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
+ gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
+
+ stream->WritesDone();
+ Status s = stream->Finish();
+
+ // Depending on the value of server_try_cancel, the RPC might have been
+ // cancelled by the server at different stages. The following validates our
+ // expectations of number of messages read in various cancellation
+ // scenarios:
+ switch (server_try_cancel) {
+ case CANCEL_BEFORE_PROCESSING:
+ EXPECT_EQ(num_msgs_read, 0);
+ break;
+
+ case CANCEL_DURING_PROCESSING:
+ EXPECT_LE(num_msgs_sent, num_messages);
+ EXPECT_LE(num_msgs_read, num_msgs_sent);
+ break;
+
+ case CANCEL_AFTER_PROCESSING:
+ EXPECT_EQ(num_msgs_sent, num_messages);
+ EXPECT_EQ(num_msgs_read, num_msgs_sent);
+ break;
+
+ default:
+ gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
+ server_try_cancel);
+ EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
+ server_try_cancel <= CANCEL_AFTER_PROCESSING);
+ break;
+ }
+
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ }
+};
+
+TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ context.AddMetadata(kServerTryCancelRequest,
+ std::to_string(CANCEL_BEFORE_PROCESSING));
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+}
+
+// Server to cancel before doing reading the request
+TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
+ TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
+}
+
+// Server to cancel while reading a request from the stream in parallel
+TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
+ TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
+}
+
+// Server to cancel after reading all the requests but before returning to the
+// client
+TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
+ TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
+}
+
+// Server to cancel before sending any response messages
+TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
+ TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
+}
+
+// Server to cancel while writing a response to the stream in parallel
+TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
+ TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
+}
+
+// Server to cancel after writing all the respones to the stream but before
+// returning to the client
+TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
+ TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
+}
+
+// Server to cancel before reading/writing any requests/responses on the stream
+TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
+ TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
+}
+
+// Server to cancel while reading/writing requests/responses on the stream in
+// parallel
+TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
+ TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
+}
+
+// Server to cancel after reading/writing all requests/responses on the stream
+// but before returning to the client
+TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
+ TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
+}
+
TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
ResetStub();
std::vector<std::thread*> threads;
@@ -1224,6 +1354,9 @@ INSTANTIATE_TEST_CASE_P(End2end, End2endTest,
::testing::Values(TestScenario(false, false),
TestScenario(false, true)));
+INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest,
+ ::testing::Values(TestScenario(false, false)));
+
INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest,
::testing::Values(TestScenario(false, false),
TestScenario(false, true),
diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc
index f8405627f9..c72e20628f 100644
--- a/test/cpp/end2end/hybrid_end2end_test.cc
+++ b/test/cpp/end2end/hybrid_end2end_test.cc
@@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test {
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
- cqs_.push_back(std::move(builder.AddCompletionQueue()));
+ cqs_.push_back(builder.AddCompletionQueue());
}
server_ = builder.BuildAndStart();
}
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index c9a32ecf5a..66d11d0dfc 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -33,6 +33,8 @@
#include "test/cpp/end2end/test_service_impl.h"
+#include <thread>
+
#include <grpc++/security/credentials.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
@@ -82,6 +84,17 @@ void CheckServerAuthContext(const ServerContext* context,
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
+ int server_try_cancel = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ if (server_try_cancel > DO_NOT_CANCEL) {
+ // Since this is a unary RPC, by the time this server handler is called,
+ // the 'request' message is already read from the client. So the scenarios
+ // in server_try_cancel don't make much sense. Just cancel the RPC as long
+ // as server_try_cancel is not DO_NOT_CANCEL
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (host_) {
@@ -106,7 +119,8 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
gpr_time_from_micros(request->param().server_cancel_after_us(),
GPR_TIMESPAN)));
return Status::CANCELLED;
- } else {
+ } else if (!request->has_param() ||
+ !request->param().skip_cancelled_check()) {
EXPECT_FALSE(context->IsCancelled());
}
@@ -142,18 +156,39 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
Status TestServiceImpl::RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) {
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
+ // any message from the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ int server_try_cancel = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ // If 'cancel_after_reads' is set in the metadata AND non-zero, the server
+ // will cancel the RPC (by just returning Status::CANCELLED - doesn't call
+ // ServerContext::TryCancel()) after reading the number of records specified
+ // by the 'cancel_after_reads' value set in the metadata.
+ int cancel_after_reads = GetIntValueFromMetadata(
+ kServerCancelAfterReads, context->client_metadata(), 0);
+
EchoRequest request;
response->set_message("");
- int cancel_after_reads = 0;
- const std::multimap<grpc::string_ref, grpc::string_ref>&
- client_initial_metadata = context->client_metadata();
- if (client_initial_metadata.find(kServerCancelAfterReads) !=
- client_initial_metadata.end()) {
- std::istringstream iss(ToString(
- client_initial_metadata.find(kServerCancelAfterReads)->second));
- iss >> cancel_after_reads;
- gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads);
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
+
+ int num_msgs_read = 0;
while (reader->Read(&request)) {
if (cancel_after_reads == 1) {
gpr_log(GPR_INFO, "return cancel status");
@@ -163,21 +198,65 @@ Status TestServiceImpl::RequestStream(ServerContext* context,
}
response->mutable_message()->append(request.message());
}
+ gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
return Status::OK;
}
-// Return 3 messages.
+// Return 'kNumResponseStreamMsgs' messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status TestServiceImpl::ResponseStream(ServerContext* context,
const EchoRequest* request,
ServerWriter<EchoResponse>* writer) {
+ // If server_try_cancel is set in the metadata, the RPC is cancelled by the
+ // server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
+ // any messages to the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // writing messages to the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
+ // all the messages to the client
+ int server_try_cancel = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
EchoResponse response;
- response.set_message(request->message() + "0");
- writer->Write(response);
- response.set_message(request->message() + "1");
- writer->Write(response);
- response.set_message(request->message() + "2");
- writer->Write(response);
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ }
+
+ for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
+ response.set_message(request->message() + std::to_string(i));
+ writer->Write(response);
+ }
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
return Status::OK;
}
@@ -185,15 +264,70 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
Status TestServiceImpl::BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
+ // If server_try_cancel is set in the metadata, the RPC is cancelled by the
+ // server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
+ // writes any messages from/to the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // reading/writing messages from/to the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
+ // reads/writes all messages from/to the client
+ int server_try_cancel = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
EchoRequest request;
EchoResponse response;
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ std::thread* server_try_cancel_thd = NULL;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ }
+
while (stream->Read(&request)) {
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
stream->Write(response);
}
+
+ if (server_try_cancel_thd != NULL) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
return Status::OK;
}
+int TestServiceImpl::GetIntValueFromMetadata(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value) {
+ if (metadata.find(key) != metadata.end()) {
+ std::istringstream iss(ToString(metadata.find(key)->second));
+ iss >> default_value;
+ gpr_log(GPR_INFO, "%s : %d", key, default_value);
+ }
+
+ return default_value;
+}
+
+void TestServiceImpl::ServerTryCancel(ServerContext* context) {
+ EXPECT_FALSE(context->IsCancelled());
+ context->TryCancel();
+ gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
+ EXPECT_TRUE(context->IsCancelled());
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h
index 2c35b5614c..1ab6ced9e0 100644
--- a/test/cpp/end2end/test_service_impl.h
+++ b/test/cpp/end2end/test_service_impl.h
@@ -44,7 +44,16 @@
namespace grpc {
namespace testing {
+const int kNumResponseStreamsMsgs = 3;
const char* const kServerCancelAfterReads = "cancel_after_reads";
+const char* const kServerTryCancelRequest = "server_try_cancel";
+
+typedef enum {
+ DO_NOT_CANCEL = 0,
+ CANCEL_BEFORE_PROCESSING,
+ CANCEL_DURING_PROCESSING,
+ CANCEL_AFTER_PROCESSING
+} ServerTryCancelRequestPhase;
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
@@ -74,6 +83,14 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
}
private:
+ int GetIntValueFromMetadata(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value);
+
+ void ServerTryCancel(ServerContext* context);
+
+ private:
bool signal_client_;
std::mutex mu_;
std::unique_ptr<grpc::string> host_;
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index 4e8860e843..4c7caa9b87 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -45,6 +45,7 @@
#include <grpc/support/time.h>
#include <gtest/gtest.h>
+#include "src/core/surface/api_trace.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@@ -54,6 +55,9 @@ using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using std::chrono::system_clock;
+const int kNumThreads = 100; // Number of threads
+const int kNumRpcs = 1000; // Number of RPCs per thread
+
namespace grpc {
namespace testing {
@@ -84,7 +88,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
MaybeEchoDeadline(context, request, response);
if (request->has_param() && request->param().client_cancel_after_us()) {
{
- std::unique_lock<std::mutex> lock(mu_);
+ unique_lock<mutex> lock(mu_);
signal_client_ = true;
}
while (!context->IsCancelled()) {
@@ -149,13 +153,13 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
}
bool signal_client() {
- std::unique_lock<std::mutex> lock(mu_);
+ unique_lock<mutex> lock(mu_);
return signal_client_;
}
private:
bool signal_client_;
- std::mutex mu_;
+ mutex mu_;
};
class TestServiceImplDupPkg
@@ -168,11 +172,10 @@ class TestServiceImplDupPkg
}
};
-class End2endTest : public ::testing::Test {
- protected:
- End2endTest() : kMaxMessageSize_(8192) {}
-
- void SetUp() GRPC_OVERRIDE {
+class CommonStressTest {
+ public:
+ CommonStressTest() : kMaxMessageSize_(8192) {}
+ void SetUp() {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
@@ -185,15 +188,15 @@ class End2endTest : public ::testing::Test {
builder.RegisterService(&dup_pkg_service_);
server_ = builder.BuildAndStart();
}
-
- void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
-
+ void TearDown() { server_->Shutdown(); }
void ResetStub() {
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
+ private:
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
@@ -202,6 +205,16 @@ class End2endTest : public ::testing::Test {
TestServiceImplDupPkg dup_pkg_service_;
};
+class End2endTest : public ::testing::Test {
+ protected:
+ End2endTest() {}
+ void SetUp() GRPC_OVERRIDE { common_.SetUp(); }
+ void TearDown() GRPC_OVERRIDE { common_.TearDown(); }
+ void ResetStub() { common_.ResetStub(); }
+
+ CommonStressTest common_;
+};
+
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
EchoRequest request;
EchoResponse response;
@@ -216,17 +229,113 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
}
TEST_F(End2endTest, ThreadStress) {
- ResetStub();
+ common_.ResetStub();
std::vector<std::thread*> threads;
- for (int i = 0; i < 100; ++i) {
- threads.push_back(new std::thread(SendRpc, stub_.get(), 1000));
+ for (int i = 0; i < kNumThreads; ++i) {
+ threads.push_back(new std::thread(SendRpc, common_.GetStub(), kNumRpcs));
}
- for (int i = 0; i < 100; ++i) {
+ for (int i = 0; i < kNumThreads; ++i) {
threads[i]->join();
delete threads[i];
}
}
+class AsyncClientEnd2endTest : public ::testing::Test {
+ protected:
+ AsyncClientEnd2endTest() : rpcs_outstanding_(0) {}
+
+ void SetUp() GRPC_OVERRIDE { common_.SetUp(); }
+ void TearDown() GRPC_OVERRIDE {
+ void* ignored_tag;
+ bool ignored_ok;
+ while (cq_.Next(&ignored_tag, &ignored_ok))
+ ;
+ common_.TearDown();
+ }
+
+ void Wait() {
+ unique_lock<mutex> l(mu_);
+ while (rpcs_outstanding_ != 0) {
+ cv_.wait(l);
+ }
+
+ cq_.Shutdown();
+ }
+
+ struct AsyncClientCall {
+ EchoResponse response;
+ ClientContext context;
+ Status status;
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
+ };
+
+ void AsyncSendRpc(int num_rpcs) {
+ for (int i = 0; i < num_rpcs; ++i) {
+ AsyncClientCall* call = new AsyncClientCall;
+ EchoRequest request;
+ request.set_message("Hello");
+ call->response_reader =
+ common_.GetStub()->AsyncEcho(&call->context, request, &cq_);
+ call->response_reader->Finish(&call->response, &call->status,
+ (void*)call);
+
+ unique_lock<mutex> l(mu_);
+ rpcs_outstanding_++;
+ }
+ }
+
+ void AsyncCompleteRpc() {
+ while (true) {
+ void* got_tag;
+ bool ok = false;
+ if (!cq_.Next(&got_tag, &ok)) break;
+ AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
+ GPR_ASSERT(ok);
+ delete call;
+
+ bool notify;
+ {
+ unique_lock<mutex> l(mu_);
+ rpcs_outstanding_--;
+ notify = (rpcs_outstanding_ == 0);
+ }
+ if (notify) {
+ cv_.notify_all();
+ }
+ }
+ }
+
+ CommonStressTest common_;
+ CompletionQueue cq_;
+ mutex mu_;
+ condition_variable cv_;
+ int rpcs_outstanding_;
+};
+
+TEST_F(AsyncClientEnd2endTest, ThreadStress) {
+ common_.ResetStub();
+ std::vector<std::thread*> send_threads, completion_threads;
+ for (int i = 0; i < kNumThreads / 2; ++i) {
+ completion_threads.push_back(new std::thread(
+ &AsyncClientEnd2endTest_ThreadStress_Test::AsyncCompleteRpc, this));
+ }
+ for (int i = 0; i < kNumThreads / 2; ++i) {
+ send_threads.push_back(
+ new std::thread(&AsyncClientEnd2endTest_ThreadStress_Test::AsyncSendRpc,
+ this, kNumRpcs));
+ }
+ for (int i = 0; i < kNumThreads / 2; ++i) {
+ send_threads[i]->join();
+ delete send_threads[i];
+ }
+
+ Wait();
+ for (int i = 0; i < kNumThreads / 2; ++i) {
+ completion_threads[i]->join();
+ delete completion_threads[i];
+ }
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc
index c2f3ad8fde..fad9a323af 100644
--- a/test/cpp/qps/limit_cores.cc
+++ b/test/cpp/qps/limit_cores.cc
@@ -36,7 +36,6 @@
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
-#include <vector>
namespace grpc {
namespace testing {
@@ -46,11 +45,11 @@ namespace testing {
#define _GNU_SOURCE
#endif
#include <sched.h>
-int LimitCores(const int *cores, int cores_size) {
+int LimitCores(const int* cores, int cores_size) {
const int num_cores = gpr_cpu_num_cores();
int cores_set = 0;
- cpu_set_t *cpup = CPU_ALLOC(num_cores);
+ cpu_set_t* cpup = CPU_ALLOC(num_cores);
GPR_ASSERT(cpup);
const size_t size = CPU_ALLOC_SIZE(num_cores);
CPU_ZERO_S(size, cpup);
@@ -74,7 +73,7 @@ int LimitCores(const int *cores, int cores_size) {
}
#else
// LimitCores is not currently supported for non-Linux platforms
-int LimitCores(std::vector<int> core_vec) { return gpr_cpu_num_cores(); }
+int LimitCores(const int*, int) { return gpr_cpu_num_cores(); }
#endif
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/limit_cores.h b/test/cpp/qps/limit_cores.h
index 5c0d1e315d..5482904a3c 100644
--- a/test/cpp/qps/limit_cores.h
+++ b/test/cpp/qps/limit_cores.h
@@ -34,8 +34,6 @@
#ifndef TEST_QPS_LIMIT_CORES_H
#define TEST_QPS_LIMIT_CORES_H
-#include <vector>
-
namespace grpc {
namespace testing {
/// LimitCores: allow this worker to only run on the cores specified in the