aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/common/auth_property_iterator_test.cc2
-rw-r--r--test/cpp/common/secure_auth_context_test.cc2
-rw-r--r--test/cpp/end2end/async_end2end_test.cc375
-rw-r--r--test/cpp/end2end/end2end_test.cc40
-rw-r--r--test/cpp/end2end/generic_end2end_test.cc2
-rw-r--r--test/cpp/end2end/hybrid_end2end_test.cc11
-rw-r--r--test/cpp/end2end/proto_server_reflection_test.cc166
-rw-r--r--test/cpp/end2end/server_builder_plugin_test.cc265
-rw-r--r--test/cpp/end2end/test_service_impl.cc8
-rw-r--r--test/cpp/end2end/test_service_impl.h1
-rw-r--r--test/cpp/end2end/zookeeper_test.cc2
-rw-r--r--test/cpp/grpclb/grpclb_api_test.cc11
-rw-r--r--test/cpp/interop/client.cc18
-rw-r--r--test/cpp/interop/client_helper.cc19
-rw-r--r--test/cpp/interop/interop_client.cc446
-rw-r--r--test/cpp/interop/interop_client.h62
-rw-r--r--test/cpp/interop/metrics_client.cc18
-rw-r--r--test/cpp/interop/server_main.cc24
-rw-r--r--test/cpp/interop/stress_interop_client.cc97
-rw-r--r--test/cpp/interop/stress_interop_client.h32
-rw-r--r--test/cpp/interop/stress_test.cc32
-rw-r--r--test/cpp/qps/async_streaming_ping_pong_test.cc78
-rw-r--r--test/cpp/qps/async_unary_ping_pong_test.cc77
-rw-r--r--test/cpp/qps/client.h17
-rw-r--r--test/cpp/qps/client_async.cc13
-rw-r--r--test/cpp/qps/client_sync.cc1
-rw-r--r--test/cpp/qps/driver.cc12
-rwxr-xr-xtest/cpp/qps/gen_build_yaml.py33
-rw-r--r--test/cpp/qps/generic_async_streaming_ping_pong_test.cc82
-rw-r--r--test/cpp/qps/parse_json.cc19
-rw-r--r--test/cpp/qps/parse_json.h5
-rw-r--r--test/cpp/qps/perf_db_client.cc140
-rw-r--r--test/cpp/qps/perf_db_client.h113
-rw-r--r--test/cpp/qps/qps_json_driver.cc2
-rw-r--r--test/cpp/qps/qps_test.cc4
-rw-r--r--test/cpp/qps/report.cc18
-rw-r--r--test/cpp/qps/report.h1
-rw-r--r--test/cpp/qps/server_async.cc10
-rw-r--r--test/cpp/qps/server_sync.cc1
-rw-r--r--test/cpp/qps/sync_streaming_ping_pong_test.cc76
-rw-r--r--test/cpp/qps/sync_unary_ping_pong_test.cc77
-rw-r--r--test/cpp/util/byte_buffer_proto_helper.h2
-rw-r--r--test/cpp/util/cli_call.cc2
-rw-r--r--test/cpp/util/grpc_cli.cc129
-rw-r--r--test/cpp/util/metrics_server.cc45
-rw-r--r--test/cpp/util/metrics_server.h40
-rw-r--r--test/cpp/util/proto_file_parser.cc178
-rw-r--r--test/cpp/util/proto_file_parser.h85
-rw-r--r--test/cpp/util/proto_reflection_descriptor_database.cc316
-rw-r--r--test/cpp/util/proto_reflection_descriptor_database.h131
50 files changed, 2175 insertions, 1165 deletions
diff --git a/test/cpp/common/auth_property_iterator_test.cc b/test/cpp/common/auth_property_iterator_test.cc
index 0e43d4e1e0..66225ff335 100644
--- a/test/cpp/common/auth_property_iterator_test.cc
+++ b/test/cpp/common/auth_property_iterator_test.cc
@@ -38,7 +38,7 @@
#include "test/cpp/util/string_ref_helper.h"
extern "C" {
-#include "src/core/lib/security/security_context.h"
+#include "src/core/lib/security/context/security_context.h"
}
using ::grpc::testing::ToString;
diff --git a/test/cpp/common/secure_auth_context_test.cc b/test/cpp/common/secure_auth_context_test.cc
index 067361334d..b131452f73 100644
--- a/test/cpp/common/secure_auth_context_test.cc
+++ b/test/cpp/common/secure_auth_context_test.cc
@@ -38,7 +38,7 @@
#include "test/cpp/util/string_ref_helper.h"
extern "C" {
-#include "src/core/lib/security/security_context.h"
+#include "src/core/lib/security/context/security_context.h"
}
using grpc::testing::ToString;
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 7e4d6046d6..8229bda6bf 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -51,6 +51,7 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/string_ref_helper.h"
+#include "test/cpp/util/test_credentials_provider.h"
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/ev_posix.h"
@@ -58,6 +59,7 @@
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
+using grpc::testing::kTlsCredentialsType;
using std::chrono::system_clock;
GPR_TLS_DECL(g_is_async_end2end_test);
@@ -197,22 +199,68 @@ class Verifier {
bool spin_;
};
-class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
+// This class disables the server builder plugins that may add sync services to
+// the server. If there are sync services, UnimplementedRpc test will triger
+// the sync unkown rpc routine on the server side, rather than the async one
+// that needs to be tested here.
+class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
+ public:
+ void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {}
+
+ void UpdatePlugins(
+ std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins)
+ GRPC_OVERRIDE {
+ auto plugin = plugins->begin();
+ while (plugin != plugins->end()) {
+ if ((*plugin).second->has_sync_methods()) {
+ plugins->erase(plugin++);
+ } else {
+ plugin++;
+ }
+ }
+ }
+};
+
+class TestScenario {
+ public:
+ TestScenario(bool non_block, const grpc::string& creds_type,
+ const grpc::string& content)
+ : disable_blocking(non_block),
+ credentials_type(creds_type),
+ message_content(content) {}
+ void Log() const {
+ gpr_log(
+ GPR_INFO,
+ "Scenario: disable_blocking %d, credentials %s, message size %" PRIuPTR,
+ disable_blocking, credentials_type.c_str(), message_content.size());
+ }
+ bool disable_blocking;
+ const grpc::string credentials_type;
+ const grpc::string message_content;
+};
+
+class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
- AsyncEnd2endTest() {}
+ AsyncEnd2endTest() { GetParam().Log(); }
void SetUp() GRPC_OVERRIDE {
- poll_overrider_.reset(new PollingOverrider(!GetParam()));
+ poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking));
- int port = grpc_pick_unused_port_or_die();
- server_address_ << "localhost:" << port;
+ port_ = grpc_pick_unused_port_or_die();
+ server_address_ << "localhost:" << port_;
// Setup server
ServerBuilder builder;
- builder.AddListeningPort(server_address_.str(),
- grpc::InsecureServerCredentials());
+ auto server_creds = GetServerCredentials(GetParam().credentials_type);
+ builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
+
+ // TODO(zyc): make a test option to choose wheather sync plugins should be
+ // deleted
+ std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
+ new ServerBuilderSyncPluginDisabler());
+ builder.SetOption(move(sync_plugin_disabler));
server_ = builder.BuildAndStart();
gpr_tls_set(&g_is_async_end2end_test, 1);
@@ -227,11 +275,15 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
;
poll_overrider_.reset();
gpr_tls_set(&g_is_async_end2end_test, 0);
+ grpc_recycle_unused_port(port_);
}
void ResetStub() {
+ ChannelArguments args;
+ auto channel_creds =
+ GetChannelCredentials(GetParam().credentials_type, &args);
std::shared_ptr<Channel> channel =
- CreateChannel(server_address_.str(), InsecureChannelCredentials());
+ CreateCustomChannel(server_address_.str(), channel_creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
@@ -247,22 +299,23 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -274,6 +327,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
std::unique_ptr<Server> server_;
grpc::testing::EchoTestService::AsyncService service_;
std::ostringstream server_address_;
+ int port_;
std::unique_ptr<PollingOverrider> poll_overrider_;
};
@@ -302,7 +356,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -310,23 +364,22 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10));
- Verifier(GetParam()).Verify(cq_.get(), time_now);
- Verifier(GetParam()).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Verify(cq_.get(), time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam())
- .Expect(3, true)
- .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam())
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
@@ -347,41 +400,48 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ServerContext srv_ctx;
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(1, true)
+ .Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -400,39 +460,45 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ServerContext srv_ctx;
ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
- Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@@ -450,41 +516,48 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ServerContext srv_ctx;
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@@ -503,7 +576,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
@@ -516,7 +589,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second,
@@ -529,11 +602,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
-
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -552,7 +625,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
@@ -561,15 +634,15 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second,
ToString(server_initial_metadata.find(meta1.first)->second));
@@ -579,10 +652,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -601,7 +675,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
@@ -610,20 +684,22 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
+ response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -647,7 +723,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2(
"key2-bin",
@@ -671,7 +747,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second,
@@ -683,9 +759,9 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second,
ToString(server_initial_metadata.find(meta3.first)->second));
@@ -697,11 +773,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5));
+ response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -726,7 +804,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -734,15 +812,15 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_ctx.TryCancel();
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
}
@@ -761,7 +839,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -769,25 +847,29 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
- EXPECT_FALSE(srv_ctx.IsCancelled());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
+ EXPECT_FALSE(srv_ctx.IsCancelled());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
+ ChannelArguments args;
+ auto channel_creds =
+ GetChannelCredentials(GetParam().credentials_type, &args);
std::shared_ptr<Channel> channel =
- CreateChannel(server_address_.str(), InsecureChannelCredentials());
+ CreateCustomChannel(server_address_.str(), channel_creds, args);
std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
stub = grpc::testing::UnimplementedService::NewStub(channel);
EchoRequest send_request;
@@ -795,12 +877,12 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
Status recv_status;
ClientContext cli_ctx;
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message());
@@ -847,23 +929,25 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the 'RequestStream' call on client
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_request.set_message("Ping " + std::to_string(tag_idx));
cli_stream->Write(send_request, tag(tag_idx));
- Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(tag_idx, true)
+ .Verify(cq_.get());
}
cli_stream->WritesDone(tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
bool expected_server_cq_result = true;
bool ignore_cq_result = false;
@@ -871,7 +955,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know
@@ -881,7 +965,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -939,13 +1023,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server sends the final message and cancelled status (but the RPC is
// already cancelled at this point. So we expect the operation to fail)
srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
- // TODO(sreek): The expectation here should be true. This is a bug (github
- // issue #4972)
- Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -979,13 +1061,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the 'ResponseStream' call on the client
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
bool expected_cq_result = true;
@@ -994,7 +1076,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
@@ -1004,7 +1086,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -1064,7 +1146,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx));
- Verifier(GetParam())
+ Verifier(GetParam().disable_blocking)
.Expect(tag_idx, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
}
@@ -1075,11 +1157,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server finishes the stream (but the RPC is already cancelled)
srv_stream.Finish(Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -1114,19 +1196,19 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the call from the client side
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message
send_request.set_message("Ping");
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
bool expected_cq_result = true;
bool ignore_cq_result = false;
@@ -1134,7 +1216,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
@@ -1144,7 +1226,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -1244,10 +1326,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// know that cq results are supposed to return false on server.
srv_stream.Finish(Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -1289,11 +1371,48 @@ TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
}
+std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
+ bool test_secure,
+ int test_big_limit) {
+ std::vector<TestScenario> scenarios;
+ std::vector<grpc::string> credentials_types;
+ std::vector<grpc::string> messages;
+
+ credentials_types.push_back(kInsecureCredentialsType);
+ auto sec_list = GetSecureCredentialsTypeList();
+ for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
+ credentials_types.push_back(*sec);
+ }
+
+ messages.push_back("Hello");
+ for (int sz = 1; sz < test_big_limit; sz *= 2) {
+ grpc::string big_msg;
+ for (int i = 0; i < sz * 1024; i++) {
+ char c = 'a' + (i % 26);
+ big_msg += c;
+ }
+ messages.push_back(big_msg);
+ }
+
+ for (auto cred = credentials_types.begin(); cred != credentials_types.end();
+ ++cred) {
+ for (auto msg = messages.begin(); msg != messages.end(); msg++) {
+ scenarios.push_back(TestScenario(false, *cred, *msg));
+ if (test_disable_blocking) {
+ scenarios.push_back(TestScenario(true, *cred, *msg));
+ }
+ }
+ }
+ return scenarios;
+}
+
INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
- ::testing::Values(false, true));
+ ::testing::ValuesIn(CreateTestScenarios(true, true,
+ 1024)));
INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
AsyncEnd2endServerTryCancelTest,
- ::testing::Values(false));
+ ::testing::ValuesIn(CreateTestScenarios(false, false,
+ 0)));
} // namespace
} // namespace testing
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 0c9313f88f..e3667cf26b 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -48,7 +48,7 @@
#include <grpc/support/time.h>
#include <gtest/gtest.h>
-#include "src/core/lib/security/credentials.h"
+#include "src/core/lib/security/credentials/credentials.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@@ -975,6 +975,34 @@ TEST_P(End2endTest, NonExistingService) {
EXPECT_EQ("", s.error_message());
}
+// Ask the server to send back a serialized proto in trailer.
+// This is an example of setting error details.
+TEST_P(End2endTest, BinaryTrailerTest) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ request.mutable_param()->set_echo_metadata(true);
+ DebugInfo* info = request.mutable_param()->mutable_debug_info();
+ info->add_stack_entries("stack_entry_1");
+ info->add_stack_entries("stack_entry_2");
+ info->add_stack_entries("stack_entry_3");
+ info->set_detail("detailed debug info");
+ grpc::string expected_string = info->SerializeAsString();
+ request.set_message("Hello");
+
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_FALSE(s.ok());
+ auto trailers = context.GetServerTrailingMetadata();
+ EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
+ auto iter = trailers.find(kDebugInfoTrailerKey);
+ EXPECT_EQ(expected_string, iter->second);
+ // Parse the returned trailer into a DebugInfo proto.
+ DebugInfo returned_info;
+ EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
+}
+
//////////////////////////////////////////////////////////////////////////
// Test with and without a proxy.
class ProxyEnd2endTest : public End2endTest {
@@ -986,6 +1014,16 @@ TEST_P(ProxyEnd2endTest, SimpleRpc) {
SendRpc(stub_.get(), 1, false);
}
+TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+
+ ClientContext context;
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(ProxyEnd2endTest, MultipleRpcs) {
ResetStub();
std::vector<std::thread*> threads;
diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc
index d0cf6aea9d..57efa5fa17 100644
--- a/test/cpp/end2end/generic_end2end_test.cc
+++ b/test/cpp/end2end/generic_end2end_test.cc
@@ -38,7 +38,7 @@
#include <grpc++/create_channel.h>
#include <grpc++/generic/async_generic_service.h>
#include <grpc++/generic/generic_stub.h>
-#include <grpc++/impl/proto_utils.h>
+#include <grpc++/impl/codegen/proto_utils.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc
index 02043a89d3..2c05db345b 100644
--- a/test/cpp/end2end/hybrid_end2end_test.cc
+++ b/test/cpp/end2end/hybrid_end2end_test.cc
@@ -207,6 +207,9 @@ class HybridEnd2endTest : public ::testing::Test {
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(),
grpc::InsecureServerCredentials());
+ // Always add a sync unimplemented service: we rely on having at least one
+ // synchronous method to get a listening cq
+ builder.RegisterService(&unimplemented_service_);
builder.RegisterService(service1);
if (service2) {
builder.RegisterService(service2);
@@ -216,7 +219,7 @@ class HybridEnd2endTest : public ::testing::Test {
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
- cqs_.push_back(builder.AddCompletionQueue());
+ cqs_.push_back(builder.AddCompletionQueue(false));
}
server_ = builder.BuildAndStart();
}
@@ -252,6 +255,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoRequest send_request;
EchoResponse recv_response;
ClientContext cli_ctx;
+ cli_ctx.set_fail_fast(false);
send_request.set_message("Hello");
Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
EXPECT_EQ(send_request.message(), recv_response.message());
@@ -265,6 +269,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoRequest send_request;
EchoResponse recv_response;
ClientContext cli_ctx;
+ cli_ctx.set_fail_fast(false);
send_request.set_message("Hello");
Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
@@ -276,6 +281,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoResponse recv_response;
grpc::string expected_message;
ClientContext cli_ctx;
+ cli_ctx.set_fail_fast(false);
send_request.set_message("Hello");
auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
for (int i = 0; i < 5; i++) {
@@ -292,6 +298,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoRequest request;
EchoResponse response;
ClientContext context;
+ context.set_fail_fast(false);
request.set_message("hello");
auto stream = stub_->ResponseStream(&context, request);
@@ -311,6 +318,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoRequest request;
EchoResponse response;
ClientContext context;
+ context.set_fail_fast(false);
grpc::string msg("hello");
auto stream = stub_->BidiStream(&context);
@@ -338,6 +346,7 @@ class HybridEnd2endTest : public ::testing::Test {
EXPECT_TRUE(s.ok());
}
+ grpc::testing::UnimplementedService::Service unimplemented_service_;
std::vector<std::unique_ptr<ServerCompletionQueue> > cqs_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
diff --git a/test/cpp/end2end/proto_server_reflection_test.cc b/test/cpp/end2end/proto_server_reflection_test.cc
new file mode 100644
index 0000000000..f8fc39b553
--- /dev/null
+++ b/test/cpp/end2end/proto_server_reflection_test.cc
@@ -0,0 +1,166 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <google/protobuf/descriptor.h>
+#include <grpc++/channel.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/ext/proto_server_reflection_plugin.h>
+#include <grpc++/security/credentials.h>
+#include <grpc++/security/server_credentials.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc/grpc.h>
+#include <gtest/gtest.h>
+
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
+#include "test/cpp/util/proto_reflection_descriptor_database.h"
+
+namespace grpc {
+namespace testing {
+
+class ProtoServerReflectionTest : public ::testing::Test {
+ public:
+ ProtoServerReflectionTest() {}
+
+ void SetUp() GRPC_OVERRIDE {
+ port_ = grpc_pick_unused_port_or_die();
+ ref_desc_pool_ = google::protobuf::DescriptorPool::generated_pool();
+
+ ServerBuilder builder;
+ grpc::string server_address = "localhost:" + to_string(port_);
+ builder.AddListeningPort(server_address, InsecureServerCredentials());
+ server_ = builder.BuildAndStart();
+ }
+
+ void ResetStub() {
+ string target = "dns:localhost:" + to_string(port_);
+ std::shared_ptr<Channel> channel =
+ CreateChannel(target, InsecureChannelCredentials());
+ stub_ = grpc::testing::EchoTestService::NewStub(channel);
+ desc_db_.reset(new ProtoReflectionDescriptorDatabase(channel));
+ desc_pool_.reset(new google::protobuf::DescriptorPool(desc_db_.get()));
+ }
+
+ string to_string(const int number) {
+ std::stringstream strs;
+ strs << number;
+ return strs.str();
+ }
+
+ void CompareService(const grpc::string& service) {
+ const google::protobuf::ServiceDescriptor* service_desc =
+ desc_pool_->FindServiceByName(service);
+ const google::protobuf::ServiceDescriptor* ref_service_desc =
+ ref_desc_pool_->FindServiceByName(service);
+ EXPECT_TRUE(service_desc != nullptr);
+ EXPECT_TRUE(ref_service_desc != nullptr);
+ EXPECT_EQ(service_desc->DebugString(), ref_service_desc->DebugString());
+
+ const google::protobuf::FileDescriptor* file_desc = service_desc->file();
+ if (known_files_.find(file_desc->package() + "/" + file_desc->name()) !=
+ known_files_.end()) {
+ EXPECT_EQ(file_desc->DebugString(),
+ ref_service_desc->file()->DebugString());
+ known_files_.insert(file_desc->package() + "/" + file_desc->name());
+ }
+
+ for (int i = 0; i < service_desc->method_count(); ++i) {
+ CompareMethod(service_desc->method(i)->full_name());
+ }
+ }
+
+ void CompareMethod(const grpc::string& method) {
+ const google::protobuf::MethodDescriptor* method_desc =
+ desc_pool_->FindMethodByName(method);
+ const google::protobuf::MethodDescriptor* ref_method_desc =
+ ref_desc_pool_->FindMethodByName(method);
+ EXPECT_TRUE(method_desc != nullptr);
+ EXPECT_TRUE(ref_method_desc != nullptr);
+ EXPECT_EQ(method_desc->DebugString(), ref_method_desc->DebugString());
+
+ CompareType(method_desc->input_type()->full_name());
+ CompareType(method_desc->output_type()->full_name());
+ }
+
+ void CompareType(const grpc::string& type) {
+ if (known_types_.find(type) != known_types_.end()) {
+ return;
+ }
+
+ const google::protobuf::Descriptor* desc =
+ desc_pool_->FindMessageTypeByName(type);
+ const google::protobuf::Descriptor* ref_desc =
+ ref_desc_pool_->FindMessageTypeByName(type);
+ EXPECT_TRUE(desc != nullptr);
+ EXPECT_TRUE(ref_desc != nullptr);
+ EXPECT_EQ(desc->DebugString(), ref_desc->DebugString());
+ }
+
+ protected:
+ std::unique_ptr<Server> server_;
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+ std::unique_ptr<ProtoReflectionDescriptorDatabase> desc_db_;
+ std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_;
+ std::unordered_set<string> known_files_;
+ std::unordered_set<string> known_types_;
+ const google::protobuf::DescriptorPool* ref_desc_pool_;
+ int port_;
+ reflection::ProtoServerReflectionPlugin plugin_;
+};
+
+TEST_F(ProtoServerReflectionTest, CheckResponseWithLocalDescriptorPool) {
+ ResetStub();
+
+ std::vector<std::string> services;
+ desc_db_->GetServices(&services);
+ // The service list has at least one service (reflection servcie).
+ EXPECT_TRUE(services.size() > 0);
+
+ for (auto it = services.begin(); it != services.end(); ++it) {
+ CompareService(*it);
+ }
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc
new file mode 100644
index 0000000000..1c1095087a
--- /dev/null
+++ b/test/cpp/end2end/server_builder_plugin_test.cc
@@ -0,0 +1,265 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/channel.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/impl/server_builder_option.h>
+#include <grpc++/impl/server_builder_plugin.h>
+#include <grpc++/impl/server_initializer.h>
+#include <grpc++/security/credentials.h>
+#include <grpc++/security/server_credentials.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc/grpc.h>
+#include <gtest/gtest.h>
+
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
+
+#define PLUGIN_NAME "TestServerBuilderPlugin"
+
+namespace grpc {
+namespace testing {
+
+class TestServerBuilderPlugin : public ServerBuilderPlugin {
+ public:
+ TestServerBuilderPlugin() : service_(new TestServiceImpl()) {
+ init_server_is_called_ = false;
+ finish_is_called_ = false;
+ change_arguments_is_called_ = false;
+ register_service_ = false;
+ }
+
+ grpc::string name() GRPC_OVERRIDE { return PLUGIN_NAME; }
+
+ void InitServer(ServerInitializer* si) GRPC_OVERRIDE {
+ init_server_is_called_ = true;
+ if (register_service_) {
+ si->RegisterService(service_);
+ }
+ }
+
+ void Finish(ServerInitializer* si) GRPC_OVERRIDE { finish_is_called_ = true; }
+
+ void ChangeArguments(const grpc::string& name, void* value) GRPC_OVERRIDE {
+ change_arguments_is_called_ = true;
+ }
+
+ bool has_async_methods() const GRPC_OVERRIDE {
+ if (register_service_) {
+ return service_->has_async_methods();
+ }
+ return false;
+ }
+
+ bool has_sync_methods() const GRPC_OVERRIDE {
+ if (register_service_) {
+ return service_->has_synchronous_methods();
+ }
+ return false;
+ }
+
+ void SetRegisterService() { register_service_ = true; }
+
+ bool init_server_is_called() { return init_server_is_called_; }
+ bool finish_is_called() { return finish_is_called_; }
+ bool change_arguments_is_called() { return change_arguments_is_called_; }
+
+ private:
+ bool init_server_is_called_;
+ bool finish_is_called_;
+ bool change_arguments_is_called_;
+ bool register_service_;
+ std::shared_ptr<TestServiceImpl> service_;
+};
+
+class InsertPluginServerBuilderOption : public ServerBuilderOption {
+ public:
+ InsertPluginServerBuilderOption() { register_service_ = false; }
+
+ void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {}
+
+ void UpdatePlugins(
+ std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins)
+ GRPC_OVERRIDE {
+ plugins->clear();
+
+ std::unique_ptr<TestServerBuilderPlugin> plugin(
+ new TestServerBuilderPlugin());
+ if (register_service_) plugin->SetRegisterService();
+ (*plugins)[plugin->name()] = std::move(plugin);
+ }
+
+ void SetRegisterService() { register_service_ = true; }
+
+ private:
+ bool register_service_;
+};
+
+std::unique_ptr<ServerBuilderPlugin> CreateTestServerBuilderPlugin() {
+ return std::unique_ptr<ServerBuilderPlugin>(new TestServerBuilderPlugin());
+}
+
+void AddTestServerBuilderPlugin() {
+ static bool already_here = false;
+ if (already_here) return;
+ already_here = true;
+ ::grpc::ServerBuilder::InternalAddPluginFactory(
+ &CreateTestServerBuilderPlugin);
+}
+
+// Force AddServerBuilderPlugin() to be called at static initialization time.
+struct StaticTestPluginInitializer {
+ StaticTestPluginInitializer() { AddTestServerBuilderPlugin(); }
+} static_plugin_initializer_test_;
+
+// When the param boolean is true, the ServerBuilder plugin will be added at the
+// time of static initialization. When it's false, the ServerBuilder plugin will
+// be added using ServerBuilder::SetOption().
+class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
+ public:
+ ServerBuilderPluginTest() {}
+
+ void SetUp() GRPC_OVERRIDE {
+ port_ = grpc_pick_unused_port_or_die();
+ builder_.reset(new ServerBuilder());
+ }
+
+ void InsertPlugin() {
+ if (GetParam()) {
+ // Add ServerBuilder plugin in static initialization
+ EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr);
+ } else {
+ // Add ServerBuilder plugin using ServerBuilder::SetOption()
+ builder_->SetOption(std::unique_ptr<ServerBuilderOption>(
+ new InsertPluginServerBuilderOption()));
+ }
+ }
+
+ void InsertPluginWithTestService() {
+ if (GetParam()) {
+ // Add ServerBuilder plugin in static initialization
+ EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr);
+ auto plugin = static_cast<TestServerBuilderPlugin*>(
+ builder_->plugins_[PLUGIN_NAME].get());
+ EXPECT_TRUE(plugin != nullptr);
+ plugin->SetRegisterService();
+ } else {
+ // Add ServerBuilder plugin using ServerBuilder::SetOption()
+ std::unique_ptr<InsertPluginServerBuilderOption> option(
+ new InsertPluginServerBuilderOption());
+ option->SetRegisterService();
+ builder_->SetOption(std::move(option));
+ }
+ }
+
+ void StartServer() {
+ grpc::string server_address = "localhost:" + to_string(port_);
+ builder_->AddListeningPort(server_address, InsecureServerCredentials());
+ cq_ = builder_->AddCompletionQueue();
+ server_ = builder_->BuildAndStart();
+ EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr);
+ }
+
+ void ResetStub() {
+ string target = "dns:localhost:" + to_string(port_);
+ channel_ = CreateChannel(target, InsecureChannelCredentials());
+ stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+ }
+
+ void TearDown() GRPC_OVERRIDE {
+ EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr);
+ auto plugin = static_cast<TestServerBuilderPlugin*>(
+ builder_->plugins_[PLUGIN_NAME].get());
+ EXPECT_TRUE(plugin != nullptr);
+ EXPECT_TRUE(plugin->init_server_is_called());
+ EXPECT_TRUE(plugin->finish_is_called());
+ server_->Shutdown();
+ void* tag;
+ bool ok;
+ cq_->Shutdown();
+ while (cq_->Next(&tag, &ok))
+ ;
+ }
+
+ string to_string(const int number) {
+ std::stringstream strs;
+ strs << number;
+ return strs.str();
+ }
+
+ protected:
+ std::shared_ptr<Channel> channel_;
+ std::unique_ptr<ServerBuilder> builder_;
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+ std::unique_ptr<ServerCompletionQueue> cq_;
+ std::unique_ptr<Server> server_;
+ TestServiceImpl service_;
+ int port_;
+};
+
+TEST_P(ServerBuilderPluginTest, PluginWithoutServiceTest) {
+ InsertPlugin();
+ StartServer();
+}
+
+TEST_P(ServerBuilderPluginTest, PluginWithServiceTest) {
+ InsertPluginWithTestService();
+ StartServer();
+ ResetStub();
+
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Hello hello hello hello");
+ ClientContext context;
+ context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok());
+}
+
+INSTANTIATE_TEST_CASE_P(ServerBuilderPluginTest, ServerBuilderPluginTest,
+ ::testing::Values(false, true));
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 2f5dd6d49e..cbaee92228 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -135,6 +135,14 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
context->AddTrailingMetadata(ToString(iter->first),
ToString(iter->second));
}
+ // Terminate rpc with error and debug info in trailer.
+ if (request->param().debug_info().stack_entries_size() ||
+ !request->param().debug_info().detail().empty()) {
+ grpc::string serialized_debug_info =
+ request->param().debug_info().SerializeAsString();
+ context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info);
+ return Status::CANCELLED;
+ }
}
if (request->has_param() &&
(request->param().expected_client_identity().length() > 0 ||
diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h
index 1ab6ced9e0..c89f88c900 100644
--- a/test/cpp/end2end/test_service_impl.h
+++ b/test/cpp/end2end/test_service_impl.h
@@ -47,6 +47,7 @@ namespace testing {
const int kNumResponseStreamsMsgs = 3;
const char* const kServerCancelAfterReads = "cancel_after_reads";
const char* const kServerTryCancelRequest = "server_try_cancel";
+const char* const kDebugInfoTrailerKey = "debug-info-bin";
typedef enum {
DO_NOT_CANCEL = 0,
diff --git a/test/cpp/end2end/zookeeper_test.cc b/test/cpp/end2end/zookeeper_test.cc
index 12853a1b98..fdc500e535 100644
--- a/test/cpp/end2end/zookeeper_test.cc
+++ b/test/cpp/end2end/zookeeper_test.cc
@@ -101,7 +101,7 @@ class ZookeeperTest : public ::testing::Test {
zookeeper_address_ = addr_str;
gpr_free(addr);
}
- gpr_log(GPR_DEBUG, zookeeper_address_.c_str());
+ gpr_log(GPR_DEBUG, "%s", zookeeper_address_.c_str());
// Connects to zookeeper server
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index 92f93c869c..bf77878e0a 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -35,13 +35,13 @@
#include <string>
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
-#include "src/proto/grpc/lb/v0/load_balancer.pb.h" // C++ version
+#include "src/proto/grpc/lb/v1/load_balancer.pb.h" // C++ version
namespace grpc {
namespace {
-using grpc::lb::v0::LoadBalanceRequest;
-using grpc::lb::v0::LoadBalanceResponse;
+using grpc::lb::v1::LoadBalanceRequest;
+using grpc::lb::v1::LoadBalanceResponse;
class GrpclbTest : public ::testing::Test {};
@@ -60,9 +60,7 @@ TEST_F(GrpclbTest, CreateRequest) {
TEST_F(GrpclbTest, ParseResponse) {
LoadBalanceResponse response;
- const std::string client_config_str = "I'm a client config";
auto* initial_response = response.mutable_initial_response();
- initial_response->set_client_config(client_config_str);
auto* client_stats_report_interval =
initial_response->mutable_client_stats_report_interval();
client_stats_report_interval->set_seconds(123);
@@ -73,10 +71,7 @@ TEST_F(GrpclbTest, ParseResponse) {
gpr_slice_from_copied_string(encoded_response.c_str());
grpc_grpclb_response* c_response = grpc_grpclb_response_parse(encoded_slice);
EXPECT_TRUE(c_response->has_initial_response);
- EXPECT_TRUE(c_response->initial_response.has_client_config);
EXPECT_FALSE(c_response->initial_response.has_load_balancer_delegate);
- EXPECT_TRUE(strcmp(c_response->initial_response.client_config,
- client_config_str.c_str()) == 0);
EXPECT_EQ(c_response->initial_response.client_stats_report_interval.seconds,
123);
EXPECT_EQ(c_response->initial_response.client_stats_report_interval.nanos,
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 9af6a88044..addaf174f2 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -81,6 +81,14 @@ DEFINE_string(default_service_account, "",
DEFINE_string(service_account_key_file, "",
"Path to service account json key file.");
DEFINE_string(oauth_scope, "", "Scope for OAuth tokens.");
+DEFINE_bool(do_not_abort_on_transient_failures, false,
+ "If set to 'true', abort() is not called in case of transient "
+ "failures (i.e failures that are temporary and will likely go away "
+ "on retrying; like a temporary connection failure) and an error "
+ "message is printed instead. Note that this flag just controls "
+ "whether abort() is called or not. It does not control whether the "
+ "test is retried in case of transient failures (and currently the "
+ "interop tests are not retried even if this flag is set to true)");
using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey;
@@ -89,8 +97,9 @@ int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
int ret = 0;
- grpc::testing::InteropClient client(
- CreateChannelForTestCase(FLAGS_test_case));
+ grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
+ true,
+ FLAGS_do_not_abort_on_transient_failures);
if (FLAGS_test_case == "empty_unary") {
client.DoEmpty();
} else if (FLAGS_test_case == "large_unary") {
@@ -162,8 +171,9 @@ int main(int argc, char** argv) {
"large_unary|large_compressed_unary|client_streaming|server_streaming|"
"server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|"
"cancel_after_first_response|timeout_on_sleeping_server|empty_stream|"
- "compute_engine_creds|jwt_token_creds|oauth2_auth_token|per_rpc_creds",
- "status_code_and_message|custom_metadata", FLAGS_test_case.c_str());
+ "compute_engine_creds|jwt_token_creds|oauth2_auth_token|per_rpc_creds|"
+ "status_code_and_message|custom_metadata",
+ FLAGS_test_case.c_str());
ret = 1;
}
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc
index c8b1e505c1..c171969e14 100644
--- a/test/cpp/interop/client_helper.cc
+++ b/test/cpp/interop/client_helper.cc
@@ -97,30 +97,25 @@ std::shared_ptr<Channel> CreateChannelForTestCase(
snprintf(host_port, host_port_buf_size, "%s:%d", FLAGS_server_host.c_str(),
FLAGS_server_port);
+ std::shared_ptr<CallCredentials> creds;
if (test_case == "compute_engine_creds") {
- std::shared_ptr<CallCredentials> creds;
GPR_ASSERT(FLAGS_use_tls);
creds = GoogleComputeEngineCredentials();
- return CreateTestChannel(host_port, FLAGS_server_host_override,
- FLAGS_use_tls, !FLAGS_use_test_ca, creds);
+ GPR_ASSERT(creds);
} else if (test_case == "jwt_token_creds") {
- std::shared_ptr<CallCredentials> creds;
GPR_ASSERT(FLAGS_use_tls);
grpc::string json_key = GetServiceAccountJsonKey();
std::chrono::seconds token_lifetime = std::chrono::hours(1);
creds =
ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
- return CreateTestChannel(host_port, FLAGS_server_host_override,
- FLAGS_use_tls, !FLAGS_use_test_ca, creds);
+ GPR_ASSERT(creds);
} else if (test_case == "oauth2_auth_token") {
grpc::string raw_token = GetOauth2AccessToken();
- std::shared_ptr<CallCredentials> creds = AccessTokenCredentials(raw_token);
- return CreateTestChannel(host_port, FLAGS_server_host_override,
- FLAGS_use_tls, !FLAGS_use_test_ca, creds);
- } else {
- return CreateTestChannel(host_port, FLAGS_server_host_override,
- FLAGS_use_tls, !FLAGS_use_test_ca);
+ creds = AccessTokenCredentials(raw_token);
+ GPR_ASSERT(creds);
}
+ return CreateTestChannel(host_port, FLAGS_server_host_override, FLAGS_use_tls,
+ !FLAGS_use_test_ca, creds);
}
} // namespace testing
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index 22293d211f..0bf1fd6f73 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -55,42 +55,36 @@
namespace grpc {
namespace testing {
-static const char* kRandomFile = "test/cpp/interop/rnd.dat";
-
namespace {
// The same value is defined by the Java client.
const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
-const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
+const std::vector<int> response_stream_sizes = {31415, 59, 2653, 58979};
const int kNumResponseMessages = 2000;
const int kResponseMessageSize = 1030;
const int kReceiveDelayMilliSeconds = 20;
const int kLargeRequestSize = 271828;
const int kLargeResponseSize = 314159;
-CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
- grpc_compression_algorithm algorithm) {
- switch (algorithm) {
- case GRPC_COMPRESS_NONE:
- return CompressionType::NONE;
- case GRPC_COMPRESS_GZIP:
- return CompressionType::GZIP;
- case GRPC_COMPRESS_DEFLATE:
- return CompressionType::DEFLATE;
- default:
- GPR_ASSERT(false);
- }
-}
-
void NoopChecks(const InteropClientContextInspector& inspector,
const SimpleRequest* request, const SimpleResponse* response) {}
void CompressionChecks(const InteropClientContextInspector& inspector,
const SimpleRequest* request,
const SimpleResponse* response) {
- GPR_ASSERT(request->response_compression() ==
- GetInteropCompressionTypeFromCompressionAlgorithm(
- inspector.GetCallCompressionAlgorithm()));
- if (request->response_compression() == NONE) {
+ const grpc_compression_algorithm received_compression =
+ inspector.GetCallCompressionAlgorithm();
+ if (request->request_compressed_response() &&
+ received_compression == GRPC_COMPRESS_NONE) {
+ if (request->request_compressed_response() &&
+ received_compression == GRPC_COMPRESS_NONE) {
+ // Requested some compression, got NONE. This is an error.
+ gpr_log(GPR_ERROR,
+ "Failure: Requested compression but got uncompressed response "
+ "from server.");
+ abort();
+ }
+ }
+ if (!request->request_compressed_response()) {
GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
} else if (request->response_type() == PayloadType::COMPRESSABLE) {
// requested compression and compressable response => results should always
@@ -134,23 +128,43 @@ void InteropClient::Reset(std::shared_ptr<Channel> channel) {
serviceStub_.Reset(channel);
}
-InteropClient::InteropClient(std::shared_ptr<Channel> channel)
- : serviceStub_(channel, true) {}
-
InteropClient::InteropClient(std::shared_ptr<Channel> channel,
- bool new_stub_every_test_case)
- : serviceStub_(channel, new_stub_every_test_case) {}
+ bool new_stub_every_test_case,
+ bool do_not_abort_on_transient_failures)
+ : serviceStub_(channel, new_stub_every_test_case),
+ do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
-void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
+bool InteropClient::AssertStatusOk(const Status& s) {
if (s.ok()) {
- return;
+ return true;
+ }
+
+ // Note: At this point, s.error_code is definitely not StatusCode::OK (we
+ // already checked for s.ok() above). So, the following will call abort()
+ // (unless s.error_code() corresponds to a transient failure and
+ // 'do_not_abort_on_transient_failures' is true)
+ return AssertStatusCode(s, StatusCode::OK);
+}
+
+bool InteropClient::AssertStatusCode(const Status& s,
+ StatusCode expected_code) {
+ if (s.error_code() == expected_code) {
+ return true;
+ }
+
+ gpr_log(GPR_ERROR, "Error status code: %d (expected: %d), message: %s",
+ s.error_code(), expected_code, s.error_message().c_str());
+
+ // In case of transient transient/retryable failures (like a broken
+ // connection) we may or may not abort (see TransientFailureOrAbort())
+ if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
+ return TransientFailureOrAbort();
}
- gpr_log(GPR_ERROR, "Error status code: %d, message: %s", s.error_code(),
- s.error_message().c_str());
- GPR_ASSERT(0);
+
+ abort();
}
-void InteropClient::DoEmpty() {
+bool InteropClient::DoEmpty() {
gpr_log(GPR_DEBUG, "Sending an empty rpc...");
Empty request = Empty::default_instance();
@@ -158,17 +172,21 @@ void InteropClient::DoEmpty() {
ClientContext context;
Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
- AssertOkOrPrintErrorStatus(s);
+
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
gpr_log(GPR_DEBUG, "Empty rpc done.");
+ return true;
}
-void InteropClient::PerformLargeUnary(SimpleRequest* request,
+bool InteropClient::PerformLargeUnary(SimpleRequest* request,
SimpleResponse* response) {
- PerformLargeUnary(request, response, NoopChecks);
+ return PerformLargeUnary(request, response, NoopChecks);
}
-void InteropClient::PerformLargeUnary(SimpleRequest* request,
+bool InteropClient::PerformLargeUnary(SimpleRequest* request,
SimpleResponse* response,
CheckerFn custom_checks_fn) {
ClientContext context;
@@ -180,32 +198,38 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
custom_checks_fn(inspector, request, response);
// Payload related checks.
- if (request->response_type() != PayloadType::RANDOM) {
- GPR_ASSERT(response->payload().type() == request->response_type());
- }
+ GPR_ASSERT(response->payload().type() == request->response_type());
switch (response->payload().type()) {
case PayloadType::COMPRESSABLE:
GPR_ASSERT(response->payload().body() ==
grpc::string(kLargeResponseSize, '\0'));
break;
case PayloadType::UNCOMPRESSABLE: {
- std::ifstream rnd_file(kRandomFile);
- GPR_ASSERT(rnd_file.good());
- for (int i = 0; i < kLargeResponseSize; i++) {
- GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
- }
+ // We don't really check anything: We can't assert that the payload is
+ // uncompressed because it's the server's prerogative to decide on that,
+ // and different implementations decide differently (ie, Java always
+ // compresses when requested to do so, whereas C core throws away the
+ // compressed payload if the output is larger than the input).
+ // In addition, we don't compare the actual random bytes received because
+ // asserting that data is sent/received properly isn't the purpose of this
+ // test. Moreover, different implementations are also free to use
+ // different sets of random bytes.
} break;
default:
GPR_ASSERT(false);
}
+
+ return true;
}
-void InteropClient::DoComputeEngineCreds(
+bool InteropClient::DoComputeEngineCreds(
const grpc::string& default_service_account,
const grpc::string& oauth_scope) {
gpr_log(GPR_DEBUG,
@@ -215,7 +239,11 @@ void InteropClient::DoComputeEngineCreds(
request.set_fill_username(true);
request.set_fill_oauth_scope(true);
request.set_response_type(PayloadType::COMPRESSABLE);
- PerformLargeUnary(&request, &response);
+
+ if (!PerformLargeUnary(&request, &response)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
GPR_ASSERT(!response.username().empty());
@@ -224,9 +252,10 @@ void InteropClient::DoComputeEngineCreds(
const char* oauth_scope_str = response.oauth_scope().c_str();
GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
+ return true;
}
-void InteropClient::DoOauth2AuthToken(const grpc::string& username,
+bool InteropClient::DoOauth2AuthToken(const grpc::string& username,
const grpc::string& oauth_scope) {
gpr_log(GPR_DEBUG,
"Sending a unary rpc with raw oauth2 access token credentials ...");
@@ -239,16 +268,20 @@ void InteropClient::DoOauth2AuthToken(const grpc::string& username,
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(!response.oauth_scope().empty());
GPR_ASSERT(username == response.username());
const char* oauth_scope_str = response.oauth_scope().c_str();
GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
+ return true;
}
-void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
+bool InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
SimpleRequest request;
SimpleResponse response;
@@ -263,42 +296,54 @@ void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
+ return true;
}
-void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
+bool InteropClient::DoJwtTokenCreds(const grpc::string& username) {
gpr_log(GPR_DEBUG,
"Sending a large unary rpc with JWT token credentials ...");
SimpleRequest request;
SimpleResponse response;
request.set_fill_username(true);
request.set_response_type(PayloadType::COMPRESSABLE);
- PerformLargeUnary(&request, &response);
+
+ if (!PerformLargeUnary(&request, &response)) {
+ return false;
+ }
+
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
+ return true;
}
-void InteropClient::DoLargeUnary() {
+bool InteropClient::DoLargeUnary() {
gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
SimpleRequest request;
SimpleResponse response;
request.set_response_type(PayloadType::COMPRESSABLE);
- PerformLargeUnary(&request, &response);
+ if (!PerformLargeUnary(&request, &response)) {
+ return false;
+ }
gpr_log(GPR_DEBUG, "Large unary done.");
+ return true;
}
-void InteropClient::DoLargeCompressedUnary() {
- const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
- const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
+bool InteropClient::DoLargeCompressedUnary() {
+ const bool request_compression[] = {false, true};
+ const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE};
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
- for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
+ for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) {
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
- CompressionType_Name(compression_types[j]).c_str(),
+ request_compression[j] ? "true" : "false",
PayloadType_Name(payload_types[i]).c_str());
gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.",
@@ -306,15 +351,33 @@ void InteropClient::DoLargeCompressedUnary() {
SimpleRequest request;
SimpleResponse response;
request.set_response_type(payload_types[i]);
- request.set_response_compression(compression_types[j]);
- PerformLargeUnary(&request, &response, CompressionChecks);
+ request.set_request_compressed_response(request_compression[j]);
+
+ if (!PerformLargeUnary(&request, &response, CompressionChecks)) {
+ gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix);
+ gpr_free(log_suffix);
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix);
gpr_free(log_suffix);
}
}
+
+ return true;
+}
+
+// Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
+// false
+bool InteropClient::TransientFailureOrAbort() {
+ if (do_not_abort_on_transient_failures_) {
+ return false;
+ }
+
+ abort();
}
-void InteropClient::DoRequestStreaming() {
+bool InteropClient::DoRequestStreaming() {
gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
ClientContext context;
@@ -328,19 +391,25 @@ void InteropClient::DoRequestStreaming() {
for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
Payload* payload = request.mutable_payload();
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
- GPR_ASSERT(stream->Write(request));
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
+ return TransientFailureOrAbort();
+ }
aggregated_payload_size += request_stream_sizes[i];
}
stream->WritesDone();
+
Status s = stream->Finish();
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
- AssertOkOrPrintErrorStatus(s);
- gpr_log(GPR_DEBUG, "Request streaming done.");
+ return true;
}
-void InteropClient::DoResponseStreaming() {
- gpr_log(GPR_DEBUG, "Receiving response steaming rpc ...");
+bool InteropClient::DoResponseStreaming() {
+ gpr_log(GPR_DEBUG, "Receiving response streaming rpc ...");
ClientContext context;
StreamingOutputCallRequest request;
@@ -358,30 +427,44 @@ void InteropClient::DoResponseStreaming() {
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
- GPR_ASSERT(response_stream_sizes.size() == i);
+
+ if (i < response_stream_sizes.size()) {
+ // stream->Read() failed before reading all the expected messages. This is
+ // most likely due to connection failure.
+ gpr_log(GPR_ERROR,
+ "DoResponseStreaming(): Read fewer streams (%d) than "
+ "response_stream_sizes.size() (%" PRIuPTR ")",
+ i, response_stream_sizes.size());
+ return TransientFailureOrAbort();
+ }
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Response streaming done.");
+ return true;
}
-void InteropClient::DoResponseCompressedStreaming() {
- const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
- const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
+bool InteropClient::DoResponseCompressedStreaming() {
+ const bool request_compression[] = {false, true};
+ const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE};
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
- for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
+ for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) {
ClientContext context;
InteropClientContextInspector inspector(context);
StreamingOutputCallRequest request;
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
- CompressionType_Name(compression_types[j]).c_str(),
+ request_compression[j] ? "true" : "false",
PayloadType_Name(payload_types[i]).c_str());
- gpr_log(GPR_DEBUG, "Receiving response steaming rpc %s.", log_suffix);
+ gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix);
request.set_response_type(payload_types[i]);
- request.set_response_compression(compression_types[j]);
+ request.set_request_compressed_response(request_compression[j]);
for (size_t k = 0; k < response_stream_sizes.size(); ++k) {
ResponseParameters* response_parameter =
@@ -396,54 +479,65 @@ void InteropClient::DoResponseCompressedStreaming() {
size_t k = 0;
while (stream->Read(&response)) {
// Payload related checks.
- if (request.response_type() != PayloadType::RANDOM) {
- GPR_ASSERT(response.payload().type() == request.response_type());
- }
+ GPR_ASSERT(response.payload().type() == request.response_type());
switch (response.payload().type()) {
case PayloadType::COMPRESSABLE:
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[k], '\0'));
break;
- case PayloadType::UNCOMPRESSABLE: {
- std::ifstream rnd_file(kRandomFile);
- GPR_ASSERT(rnd_file.good());
- for (int n = 0; n < response_stream_sizes[k]; n++) {
- GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get());
- }
- } break;
+ case PayloadType::UNCOMPRESSABLE:
+ break;
default:
GPR_ASSERT(false);
}
// Compression related checks.
- GPR_ASSERT(request.response_compression() ==
- GetInteropCompressionTypeFromCompressionAlgorithm(
- inspector.GetCallCompressionAlgorithm()));
- if (request.response_compression() == NONE) {
+ if (request.request_compressed_response()) {
+ GPR_ASSERT(inspector.GetCallCompressionAlgorithm() >
+ GRPC_COMPRESS_NONE);
+ if (request.response_type() == PayloadType::COMPRESSABLE) {
+ // requested compression and compressable response => results should
+ // always be compressed.
+ GPR_ASSERT(inspector.GetMessageFlags() &
+ GRPC_WRITE_INTERNAL_COMPRESS);
+ }
+ } else {
+ // requested *no* compression.
GPR_ASSERT(
!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
- } else if (request.response_type() == PayloadType::COMPRESSABLE) {
- // requested compression and compressable response => results should
- // always be compressed.
- GPR_ASSERT(inspector.GetMessageFlags() &
- GRPC_WRITE_INTERNAL_COMPRESS);
}
++k;
}
- GPR_ASSERT(response_stream_sizes.size() == k);
- Status s = stream->Finish();
-
- AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix);
gpr_free(log_suffix);
+
+ if (k < response_stream_sizes.size()) {
+ // stream->Read() failed before reading all the expected messages. This
+ // is most likely due to a connection failure.
+ gpr_log(GPR_ERROR,
+ "DoResponseCompressedStreaming(): Responses read (k=%" PRIuPTR
+ ") is "
+ "less than the expected messages (i.e "
+ "response_stream_sizes.size() (%" PRIuPTR ")). (i=%" PRIuPTR
+ ", j=%" PRIuPTR ")",
+ k, response_stream_sizes.size(), i, j);
+ return TransientFailureOrAbort();
+ }
+
+ Status s = stream->Finish();
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
}
}
+
+ return true;
}
-void InteropClient::DoResponseStreamingWithSlowConsumer() {
- gpr_log(GPR_DEBUG, "Receiving response steaming rpc with slow consumer ...");
+bool InteropClient::DoResponseStreamingWithSlowConsumer() {
+ gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ...");
ClientContext context;
StreamingOutputCallRequest request;
@@ -464,14 +558,26 @@ void InteropClient::DoResponseStreamingWithSlowConsumer() {
usleep(kReceiveDelayMilliSeconds * 1000);
++i;
}
- GPR_ASSERT(kNumResponseMessages == i);
+
+ if (i < kNumResponseMessages) {
+ gpr_log(GPR_ERROR,
+ "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
+ "less than the expected messages (i.e kNumResponseMessages = %d)",
+ i, kNumResponseMessages);
+
+ return TransientFailureOrAbort();
+ }
+
Status s = stream->Finish();
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
- AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_DEBUG, "Response streaming done.");
+ return true;
}
-void InteropClient::DoHalfDuplex() {
+bool InteropClient::DoHalfDuplex() {
gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
ClientContext context;
@@ -483,7 +589,11 @@ void InteropClient::DoHalfDuplex() {
ResponseParameters* response_parameter = request.add_response_parameters();
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
response_parameter->set_size(response_stream_sizes[i]);
- GPR_ASSERT(stream->Write(request));
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
+ return TransientFailureOrAbort();
+ }
}
stream->WritesDone();
@@ -494,13 +604,27 @@ void InteropClient::DoHalfDuplex() {
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
- GPR_ASSERT(response_stream_sizes.size() == i);
+
+ if (i < response_stream_sizes.size()) {
+ // stream->Read() failed before reading all the expected messages. This is
+ // most likely due to a connection failure
+ gpr_log(GPR_ERROR,
+ "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
+ "number of messages response_stream_sizes.size() (%" PRIuPTR ")",
+ i, response_stream_sizes.size());
+ return TransientFailureOrAbort();
+ }
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
+ return true;
}
-void InteropClient::DoPingPong() {
+bool InteropClient::DoPingPong() {
gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
ClientContext context;
@@ -513,24 +637,40 @@ void InteropClient::DoPingPong() {
ResponseParameters* response_parameter = request.add_response_parameters();
Payload* payload = request.mutable_payload();
StreamingOutputCallResponse response;
+
for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
response_parameter->set_size(response_stream_sizes[i]);
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
- GPR_ASSERT(stream->Write(request));
- GPR_ASSERT(stream->Read(&response));
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
+ return TransientFailureOrAbort();
+ }
+
+ if (!stream->Read(&response)) {
+ gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
+ return TransientFailureOrAbort();
+ }
+
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
}
stream->WritesDone();
+
GPR_ASSERT(!stream->Read(&response));
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Ping pong streaming done.");
+ return true;
}
-void InteropClient::DoCancelAfterBegin() {
- gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
+bool InteropClient::DoCancelAfterBegin() {
+ gpr_log(GPR_DEBUG, "Sending request streaming rpc ...");
ClientContext context;
StreamingInputCallRequest request;
@@ -542,11 +682,16 @@ void InteropClient::DoCancelAfterBegin() {
gpr_log(GPR_DEBUG, "Trying to cancel...");
context.TryCancel();
Status s = stream->Finish();
- GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
+
+ if (!AssertStatusCode(s, StatusCode::CANCELLED)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Canceling streaming done.");
+ return true;
}
-void InteropClient::DoCancelAfterFirstResponse() {
+bool InteropClient::DoCancelAfterFirstResponse() {
gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
ClientContext context;
@@ -560,17 +705,27 @@ void InteropClient::DoCancelAfterFirstResponse() {
response_parameter->set_size(31415);
request.mutable_payload()->set_body(grpc::string(27182, '\0'));
StreamingOutputCallResponse response;
- GPR_ASSERT(stream->Write(request));
- GPR_ASSERT(stream->Read(&response));
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
+ return TransientFailureOrAbort();
+ }
+
+ if (!stream->Read(&response)) {
+ gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
+ return TransientFailureOrAbort();
+ }
GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
+
gpr_log(GPR_DEBUG, "Trying to cancel...");
context.TryCancel();
Status s = stream->Finish();
gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
+ return true;
}
-void InteropClient::DoTimeoutOnSleepingServer() {
+bool InteropClient::DoTimeoutOnSleepingServer() {
gpr_log(GPR_DEBUG,
"Sending Ping Pong streaming rpc with a short deadline...");
@@ -587,11 +742,15 @@ void InteropClient::DoTimeoutOnSleepingServer() {
stream->Write(request);
Status s = stream->Finish();
- GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
+ if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
+ return true;
}
-void InteropClient::DoEmptyStream() {
+bool InteropClient::DoEmptyStream() {
gpr_log(GPR_DEBUG, "Starting empty_stream.");
ClientContext context;
@@ -601,12 +760,17 @@ void InteropClient::DoEmptyStream() {
stream->WritesDone();
StreamingOutputCallResponse response;
GPR_ASSERT(stream->Read(&response) == false);
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "empty_stream done.");
+ return true;
}
-void InteropClient::DoStatusWithMessage() {
+bool InteropClient::DoStatusWithMessage() {
gpr_log(GPR_DEBUG,
"Sending RPC with a request for status code 2 and message");
@@ -620,12 +784,16 @@ void InteropClient::DoStatusWithMessage() {
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
- GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
+ if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN)) {
+ return false;
+ }
+
GPR_ASSERT(s.error_message() == test_msg);
gpr_log(GPR_DEBUG, "Done testing Status and Message");
+ return true;
}
-void InteropClient::DoCustomMetadata() {
+bool InteropClient::DoCustomMetadata() {
const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
const grpc::string kInitialMetadataValue("test_initial_metadata_value");
const grpc::string kEchoTrailingBinMetadataKey(
@@ -645,7 +813,10 @@ void InteropClient::DoCustomMetadata() {
request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
const auto& server_initial_metadata = context.GetServerInitialMetadata();
auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
GPR_ASSERT(iter != server_initial_metadata.end());
@@ -675,14 +846,29 @@ void InteropClient::DoCustomMetadata() {
grpc::string payload(kLargeRequestSize, '\0');
request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
StreamingOutputCallResponse response;
- GPR_ASSERT(stream->Write(request));
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
+ return TransientFailureOrAbort();
+ }
+
stream->WritesDone();
- GPR_ASSERT(stream->Read(&response));
+
+ if (!stream->Read(&response)) {
+ gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
+ return TransientFailureOrAbort();
+ }
+
GPR_ASSERT(response.payload().body() ==
grpc::string(kLargeResponseSize, '\0'));
+
GPR_ASSERT(!stream->Read(&response));
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
const auto& server_initial_metadata = context.GetServerInitialMetadata();
auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
GPR_ASSERT(iter != server_initial_metadata.end());
@@ -695,6 +881,8 @@ void InteropClient::DoCustomMetadata() {
gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
}
+
+ return true;
}
} // namespace testing
diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h
index a3794fd93f..ae75762bb8 100644
--- a/test/cpp/interop/interop_client.h
+++ b/test/cpp/interop/interop_client.h
@@ -51,41 +51,42 @@ using CheckerFn =
class InteropClient {
public:
- explicit InteropClient(std::shared_ptr<Channel> channel);
- explicit InteropClient(
- std::shared_ptr<Channel> channel,
- bool new_stub_every_test_case); // If new_stub_every_test_case is true,
- // a new TestService::Stub object is
- // created for every test case below
+ /// If new_stub_every_test_case is true, a new TestService::Stub object is
+ /// created for every test case
+ /// If do_not_abort_on_transient_failures is true, abort() is not called in
+ /// case of transient failures (like connection failures)
+ explicit InteropClient(std::shared_ptr<Channel> channel,
+ bool new_stub_every_test_case,
+ bool do_not_abort_on_transient_failures);
~InteropClient() {}
void Reset(std::shared_ptr<Channel> channel);
- void DoEmpty();
- void DoLargeUnary();
- void DoLargeCompressedUnary();
- void DoPingPong();
- void DoHalfDuplex();
- void DoRequestStreaming();
- void DoResponseStreaming();
- void DoResponseCompressedStreaming();
- void DoResponseStreamingWithSlowConsumer();
- void DoCancelAfterBegin();
- void DoCancelAfterFirstResponse();
- void DoTimeoutOnSleepingServer();
- void DoEmptyStream();
- void DoStatusWithMessage();
- void DoCustomMetadata();
+ bool DoEmpty();
+ bool DoLargeUnary();
+ bool DoLargeCompressedUnary();
+ bool DoPingPong();
+ bool DoHalfDuplex();
+ bool DoRequestStreaming();
+ bool DoResponseStreaming();
+ bool DoResponseCompressedStreaming();
+ bool DoResponseStreamingWithSlowConsumer();
+ bool DoCancelAfterBegin();
+ bool DoCancelAfterFirstResponse();
+ bool DoTimeoutOnSleepingServer();
+ bool DoEmptyStream();
+ bool DoStatusWithMessage();
+ bool DoCustomMetadata();
// Auth tests.
// username is a string containing the user email
- void DoJwtTokenCreds(const grpc::string& username);
- void DoComputeEngineCreds(const grpc::string& default_service_account,
+ bool DoJwtTokenCreds(const grpc::string& username);
+ bool DoComputeEngineCreds(const grpc::string& default_service_account,
const grpc::string& oauth_scope);
// username the GCE default service account email
- void DoOauth2AuthToken(const grpc::string& username,
+ bool DoOauth2AuthToken(const grpc::string& username,
const grpc::string& oauth_scope);
// username is a string containing the user email
- void DoPerRpcCreds(const grpc::string& json_key);
+ bool DoPerRpcCreds(const grpc::string& json_key);
private:
class ServiceStub {
@@ -105,13 +106,18 @@ class InteropClient {
// Get() call
};
- void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
+ bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
/// Run \a custom_check_fn as an additional check.
- void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
+ bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
CheckerFn custom_checks_fn);
- void AssertOkOrPrintErrorStatus(const Status& s);
+ bool AssertStatusOk(const Status& s);
+ bool AssertStatusCode(const Status& s, StatusCode expected_code);
+ bool TransientFailureOrAbort();
ServiceStub serviceStub_;
+
+ /// If true, abort() is not called for transient failures
+ bool do_not_abort_on_transient_failures_;
};
} // namespace testing
diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc
index cc304f2e89..7a0cb994df 100644
--- a/test/cpp/interop/metrics_client.cc
+++ b/test/cpp/interop/metrics_client.cc
@@ -42,13 +42,15 @@
#include "test/cpp/util/metrics_server.h"
#include "test/cpp/util/test_config.h"
-DEFINE_string(metrics_server_address, "",
+int kDeadlineSecs = 10;
+
+DEFINE_string(metrics_server_address, "localhost:8081",
"The metrics server addresses in the fomrat <hostname>:<port>");
+DEFINE_int32(deadline_secs, kDeadlineSecs,
+ "The deadline (in seconds) for RCP call");
DEFINE_bool(total_only, false,
"If true, this prints only the total value of all gauges");
-int kDeadlineSecs = 10;
-
using grpc::testing::EmptyMessage;
using grpc::testing::GaugeResponse;
using grpc::testing::MetricsService;
@@ -56,12 +58,13 @@ using grpc::testing::MetricsServiceImpl;
// Prints the values of all Gauges (unless total_only is set to 'true' in which
// case this only prints the sum of all gauge values).
-bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) {
+bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only,
+ int deadline_secs) {
grpc::ClientContext context;
EmptyMessage message;
std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs);
+ std::chrono::system_clock::now() + std::chrono::seconds(deadline_secs);
context.set_deadline(deadline);
@@ -73,7 +76,7 @@ bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) {
while (reader->Read(&gauge_response)) {
if (gauge_response.value_case() == GaugeResponse::kLongValue) {
if (!total_only) {
- gpr_log(GPR_INFO, "%s: %ld", gauge_response.name().c_str(),
+ gpr_log(GPR_INFO, "%s: %lld", gauge_response.name().c_str(),
gauge_response.long_value());
}
overall_qps += gauge_response.long_value();
@@ -108,7 +111,8 @@ int main(int argc, char** argv) {
std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel(
FLAGS_metrics_server_address, grpc::InsecureChannelCredentials()));
- if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) {
+ if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only,
+ FLAGS_deadline_secs)) {
return 1;
}
diff --git a/test/cpp/interop/server_main.cc b/test/cpp/interop/server_main.cc
index 889874fe49..bbedda14d2 100644
--- a/test/cpp/interop/server_main.cc
+++ b/test/cpp/interop/server_main.cc
@@ -110,14 +110,7 @@ void MaybeEchoMetadata(ServerContext* context) {
}
}
-bool SetPayload(PayloadType type, int size, Payload* payload) {
- PayloadType response_type;
- if (type == PayloadType::RANDOM) {
- response_type =
- rand() & 0x1 ? PayloadType::COMPRESSABLE : PayloadType::UNCOMPRESSABLE;
- } else {
- response_type = type;
- }
+bool SetPayload(PayloadType response_type, int size, Payload* payload) {
payload->set_type(response_type);
switch (response_type) {
case PayloadType::COMPRESSABLE: {
@@ -141,18 +134,9 @@ bool SetPayload(PayloadType type, int size, Payload* payload) {
template <typename RequestType>
void SetResponseCompression(ServerContext* context,
const RequestType& request) {
- switch (request.response_compression()) {
- case grpc::testing::NONE:
- context->set_compression_algorithm(GRPC_COMPRESS_NONE);
- break;
- case grpc::testing::GZIP:
- context->set_compression_algorithm(GRPC_COMPRESS_GZIP);
- break;
- case grpc::testing::DEFLATE:
- context->set_compression_algorithm(GRPC_COMPRESS_DEFLATE);
- break;
- default:
- abort();
+ if (request.request_compressed_response()) {
+ // Any level would do, let's go for HIGH because we are overachievers.
+ context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
}
}
diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc
index 04671fb935..aa95682e74 100644
--- a/test/cpp/interop/stress_interop_client.cc
+++ b/test/cpp/interop/stress_interop_client.cc
@@ -84,49 +84,38 @@ StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector, long test_duration_secs,
- long sleep_duration_ms, long metrics_collection_interval_secs)
+ long sleep_duration_ms, bool do_not_abort_on_transient_failures)
: test_id_(test_id),
server_address_(server_address),
channel_(channel),
- interop_client_(new InteropClient(channel, false)),
+ interop_client_(new InteropClient(channel, false,
+ do_not_abort_on_transient_failures)),
test_selector_(test_selector),
test_duration_secs_(test_duration_secs),
- sleep_duration_ms_(sleep_duration_ms),
- metrics_collection_interval_secs_(metrics_collection_interval_secs) {}
+ sleep_duration_ms_(sleep_duration_ms) {}
-void StressTestInteropClient::MainLoop(std::shared_ptr<Gauge> qps_gauge) {
+void StressTestInteropClient::MainLoop(std::shared_ptr<QpsGauge> qps_gauge) {
gpr_log(GPR_INFO, "Running test %d. ServerAddr: %s", test_id_,
server_address_.c_str());
- gpr_timespec test_end_time =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(test_duration_secs_, GPR_TIMESPAN));
+ gpr_timespec test_end_time;
+ if (test_duration_secs_ < 0) {
+ test_end_time = gpr_inf_future(GPR_CLOCK_REALTIME);
+ } else {
+ test_end_time =
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(test_duration_secs_, GPR_TIMESPAN));
+ }
- gpr_timespec current_time = gpr_now(GPR_CLOCK_REALTIME);
- gpr_timespec next_stat_collection_time = current_time;
- gpr_timespec collection_interval =
- gpr_time_from_seconds(metrics_collection_interval_secs_, GPR_TIMESPAN);
- long num_calls_per_interval = 0;
+ qps_gauge->Reset();
- while (test_duration_secs_ < 0 ||
- gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), test_end_time) < 0) {
+ while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), test_end_time) < 0) {
// Select the test case to execute based on the weights and execute it
TestCaseType test_case = test_selector_.GetNextTest();
gpr_log(GPR_DEBUG, "%d - Executing the test case %d", test_id_, test_case);
RunTest(test_case);
- num_calls_per_interval++;
-
- // See if its time to collect stats yet
- current_time = gpr_now(GPR_CLOCK_REALTIME);
- if (gpr_time_cmp(next_stat_collection_time, current_time) < 0) {
- qps_gauge->Set(num_calls_per_interval /
- metrics_collection_interval_secs_);
-
- num_calls_per_interval = 0;
- next_stat_collection_time =
- gpr_time_add(current_time, collection_interval);
- }
+ qps_gauge->Incr();
// Sleep between successive calls if needed
if (sleep_duration_ms_ > 0) {
@@ -138,31 +127,67 @@ void StressTestInteropClient::MainLoop(std::shared_ptr<Gauge> qps_gauge) {
}
}
-// TODO(sree): Add all interop tests
-void StressTestInteropClient::RunTest(TestCaseType test_case) {
+bool StressTestInteropClient::RunTest(TestCaseType test_case) {
+ bool is_success = false;
switch (test_case) {
case EMPTY_UNARY: {
- interop_client_->DoEmpty();
+ is_success = interop_client_->DoEmpty();
break;
}
case LARGE_UNARY: {
- interop_client_->DoLargeUnary();
+ is_success = interop_client_->DoLargeUnary();
break;
}
case LARGE_COMPRESSED_UNARY: {
- interop_client_->DoLargeCompressedUnary();
+ is_success = interop_client_->DoLargeCompressedUnary();
break;
}
case CLIENT_STREAMING: {
- interop_client_->DoRequestStreaming();
+ is_success = interop_client_->DoRequestStreaming();
break;
}
case SERVER_STREAMING: {
- interop_client_->DoResponseStreaming();
+ is_success = interop_client_->DoResponseStreaming();
+ break;
+ }
+ case SERVER_COMPRESSED_STREAMING: {
+ is_success = interop_client_->DoResponseCompressedStreaming();
+ break;
+ }
+ case SLOW_CONSUMER: {
+ is_success = interop_client_->DoResponseStreamingWithSlowConsumer();
+ break;
+ }
+ case HALF_DUPLEX: {
+ is_success = interop_client_->DoHalfDuplex();
+ break;
+ }
+ case PING_PONG: {
+ is_success = interop_client_->DoPingPong();
+ break;
+ }
+ case CANCEL_AFTER_BEGIN: {
+ is_success = interop_client_->DoCancelAfterBegin();
+ break;
+ }
+ case CANCEL_AFTER_FIRST_RESPONSE: {
+ is_success = interop_client_->DoCancelAfterFirstResponse();
+ break;
+ }
+ case TIMEOUT_ON_SLEEPING_SERVER: {
+ is_success = interop_client_->DoTimeoutOnSleepingServer();
break;
}
case EMPTY_STREAM: {
- interop_client_->DoEmptyStream();
+ is_success = interop_client_->DoEmptyStream();
+ break;
+ }
+ case STATUS_CODE_AND_MESSAGE: {
+ is_success = interop_client_->DoStatusWithMessage();
+ break;
+ }
+ case CUSTOM_METADATA: {
+ is_success = interop_client_->DoCustomMetadata();
break;
}
default: {
@@ -171,6 +196,8 @@ void StressTestInteropClient::RunTest(TestCaseType test_case) {
break;
}
}
+
+ return is_success;
}
} // namespace testing
diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h
index 6fd303d6b7..aa93b58b4a 100644
--- a/test/cpp/interop/stress_interop_client.h
+++ b/test/cpp/interop/stress_interop_client.h
@@ -49,7 +49,6 @@ namespace testing {
using std::pair;
using std::vector;
-// TODO(sreek): Add more test cases here in future
enum TestCaseType {
UNKNOWN_TEST = -1,
EMPTY_UNARY = 0,
@@ -57,7 +56,16 @@ enum TestCaseType {
LARGE_COMPRESSED_UNARY = 2,
CLIENT_STREAMING = 3,
SERVER_STREAMING = 4,
- EMPTY_STREAM = 5
+ SERVER_COMPRESSED_STREAMING = 5,
+ SLOW_CONSUMER = 6,
+ HALF_DUPLEX = 7,
+ PING_PONG = 8,
+ CANCEL_AFTER_BEGIN = 9,
+ CANCEL_AFTER_FIRST_RESPONSE = 10,
+ TIMEOUT_ON_SLEEPING_SERVER = 11,
+ EMPTY_STREAM = 12,
+ STATUS_CODE_AND_MESSAGE = 13,
+ CUSTOM_METADATA = 14
};
const vector<pair<TestCaseType, grpc::string>> kTestCaseList = {
@@ -66,7 +74,16 @@ const vector<pair<TestCaseType, grpc::string>> kTestCaseList = {
{LARGE_COMPRESSED_UNARY, "large_compressed_unary"},
{CLIENT_STREAMING, "client_streaming"},
{SERVER_STREAMING, "server_streaming"},
- {EMPTY_STREAM, "empty_stream"}};
+ {SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"},
+ {SLOW_CONSUMER, "slow_consumer"},
+ {HALF_DUPLEX, "half_duplex"},
+ {PING_PONG, "ping_pong"},
+ {CANCEL_AFTER_BEGIN, "cancel_after_begin"},
+ {CANCEL_AFTER_FIRST_RESPONSE, "cancel_after_first_response"},
+ {TIMEOUT_ON_SLEEPING_SERVER, "timeout_on_sleeping_server"},
+ {EMPTY_STREAM, "empty_stream"},
+ {STATUS_CODE_AND_MESSAGE, "status_code_and_message"},
+ {CUSTOM_METADATA, "custom_metadata"}};
class WeightedRandomTestSelector {
public:
@@ -88,14 +105,14 @@ class StressTestInteropClient {
std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector,
long test_duration_secs, long sleep_duration_ms,
- long metrics_collection_interval_secs);
+ bool do_not_abort_on_transient_failures);
// The main function. Use this as the thread entry point.
- // qps_gauge is the Gauge to record the requests per second metric
- void MainLoop(std::shared_ptr<Gauge> qps_gauge);
+ // qps_gauge is the QpsGauge to record the requests per second metric
+ void MainLoop(std::shared_ptr<QpsGauge> qps_gauge);
private:
- void RunTest(TestCaseType test_case);
+ bool RunTest(TestCaseType test_case);
int test_id_;
const grpc::string& server_address_;
@@ -104,7 +121,6 @@ class StressTestInteropClient {
const WeightedRandomTestSelector& test_selector_;
long test_duration_secs_;
long sleep_duration_ms_;
- long metrics_collection_interval_secs_;
};
} // namespace testing
diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc
index 38caf31b76..7787931900 100644
--- a/test/cpp/interop/stress_test.cc
+++ b/test/cpp/interop/stress_test.cc
@@ -56,9 +56,6 @@ extern void gpr_default_log(gpr_log_func_args* args);
DEFINE_int32(metrics_port, 8081, "The metrics server port.");
-DEFINE_int32(metrics_collection_interval_secs, 5,
- "How often (in seconds) should metrics be recorded.");
-
DEFINE_int32(sleep_duration_ms, 0,
"The duration (in millisec) between two"
" consecutive test calls (per server) issued by the server.");
@@ -92,7 +89,16 @@ DEFINE_string(test_cases, "",
" large_compressed_unary\n"
" client_streaming\n"
" server_streaming\n"
+ " server_compressed_streaming\n"
+ " slow_consumer\n"
+ " half_duplex\n"
+ " ping_pong\n"
+ " cancel_after_begin\n"
+ " cancel_after_first_response\n"
+ " timeout_on_sleeping_server\n"
" empty_stream\n"
+ " status_code_and_message\n"
+ " custom_metadata\n"
" Example: \"empty_unary:20,large_unary:10,empty_stream:70\"\n"
" The above will execute 'empty_unary', 20% of the time,"
" 'large_unary', 10% of the time and 'empty_stream' the remaining"
@@ -104,6 +110,10 @@ DEFINE_int32(log_level, GPR_LOG_SEVERITY_INFO,
"The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 "
"(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)");
+DEFINE_bool(do_not_abort_on_transient_failures, true,
+ "If set to 'true', abort() is not called in case of transient "
+ "failures like temporary connection failures.");
+
using grpc::testing::kTestCaseList;
using grpc::testing::MetricsService;
using grpc::testing::MetricsServiceImpl;
@@ -192,6 +202,12 @@ void LogParameterInfo(const std::vector<grpc::string>& addresses,
gpr_log(GPR_INFO, "test_cases : %s", FLAGS_test_cases.c_str());
gpr_log(GPR_INFO, "sleep_duration_ms: %d", FLAGS_sleep_duration_ms);
gpr_log(GPR_INFO, "test_duration_secs: %d", FLAGS_test_duration_secs);
+ gpr_log(GPR_INFO, "num_channels_per_server: %d",
+ FLAGS_num_channels_per_server);
+ gpr_log(GPR_INFO, "num_stubs_per_channel: %d", FLAGS_num_stubs_per_channel);
+ gpr_log(GPR_INFO, "log_level: %d", FLAGS_log_level);
+ gpr_log(GPR_INFO, "do_not_abort_on_transient_failures: %s",
+ FLAGS_do_not_abort_on_transient_failures ? "true" : "false");
int num = 0;
for (auto it = addresses.begin(); it != addresses.end(); it++) {
@@ -275,19 +291,19 @@ int main(int argc, char** argv) {
stub_idx++) {
StressTestInteropClient* client = new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
- FLAGS_sleep_duration_ms, FLAGS_metrics_collection_interval_secs);
+ FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures);
- bool is_already_created;
- // Gauge name
+ bool is_already_created = false;
+ // QpsGauge name
std::snprintf(buffer, sizeof(buffer),
"/stress_test/server_%d/channel_%d/stub_%d/qps",
server_idx, channel_idx, stub_idx);
test_threads.emplace_back(grpc::thread(
&StressTestInteropClient::MainLoop, client,
- metrics_service.CreateGauge(buffer, &is_already_created)));
+ metrics_service.CreateQpsGauge(buffer, &is_already_created)));
- // The Gauge should not have been already created
+ // The QpsGauge should not have been already created
GPR_ASSERT(!is_already_created);
}
}
diff --git a/test/cpp/qps/async_streaming_ping_pong_test.cc b/test/cpp/qps/async_streaming_ping_pong_test.cc
deleted file mode 100644
index 4b6bae0d5c..0000000000
--- a/test/cpp/qps/async_streaming_ping_pong_test.cc
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <set>
-
-#include <grpc/support/log.h>
-
-#include "test/cpp/qps/driver.h"
-#include "test/cpp/qps/report.h"
-#include "test/cpp/util/benchmark_config.h"
-
-namespace grpc {
-namespace testing {
-
-static const int WARMUP = 5;
-static const int BENCHMARK = 5;
-
-static void RunAsyncStreamingPingPong() {
- gpr_log(GPR_INFO, "Running Async Streaming Ping Pong");
-
- ClientConfig client_config;
- client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_outstanding_rpcs_per_channel(1);
- client_config.set_client_channels(1);
- client_config.set_async_client_threads(1);
- client_config.set_rpc_type(STREAMING);
- client_config.mutable_load_params()->mutable_closed_loop();
-
- ServerConfig server_config;
- server_config.set_server_type(ASYNC_SERVER);
- server_config.set_async_server_threads(1);
-
- const auto result =
- RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
-
- GetReporter()->ReportQPS(*result);
- GetReporter()->ReportLatency(*result);
-}
-
-} // namespace testing
-} // namespace grpc
-
-int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
-
- grpc::testing::RunAsyncStreamingPingPong();
- return 0;
-}
diff --git a/test/cpp/qps/async_unary_ping_pong_test.cc b/test/cpp/qps/async_unary_ping_pong_test.cc
deleted file mode 100644
index 571a8b7300..0000000000
--- a/test/cpp/qps/async_unary_ping_pong_test.cc
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <set>
-
-#include <grpc/support/log.h>
-
-#include "test/cpp/qps/driver.h"
-#include "test/cpp/qps/report.h"
-#include "test/cpp/util/benchmark_config.h"
-
-namespace grpc {
-namespace testing {
-
-static const int WARMUP = 5;
-static const int BENCHMARK = 5;
-
-static void RunAsyncUnaryPingPong() {
- gpr_log(GPR_INFO, "Running Async Unary Ping Pong");
-
- ClientConfig client_config;
- client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_outstanding_rpcs_per_channel(1);
- client_config.set_client_channels(1);
- client_config.set_async_client_threads(1);
- client_config.set_rpc_type(UNARY);
- client_config.mutable_load_params()->mutable_closed_loop();
-
- ServerConfig server_config;
- server_config.set_server_type(ASYNC_SERVER);
- server_config.set_async_server_threads(1);
-
- const auto result =
- RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
-
- GetReporter()->ReportQPS(*result);
- GetReporter()->ReportLatency(*result);
-}
-} // namespace testing
-} // namespace grpc
-
-int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
-
- grpc::testing::RunAsyncUnaryPingPong();
- return 0;
-}
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 5a9027a4a2..2a89eb8018 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -38,7 +38,9 @@
#include <mutex>
#include <vector>
+#include <grpc++/channel.h>
#include <grpc++/support/byte_buffer.h>
+#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -280,7 +282,7 @@ class ClientImpl : public Client {
create_stub_(create_stub) {
for (int i = 0; i < config.client_channels(); i++) {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
- config, create_stub_);
+ config, create_stub_, i);
}
ClientRequestCreator<RequestType> create_req(&request_,
@@ -303,14 +305,21 @@ class ClientImpl : public Client {
}
void init(const grpc::string& target, const ClientConfig& config,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
- create_stub) {
+ create_stub,
+ int shard) {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
+ ChannelArguments args;
+ args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
channel_ = CreateTestChannel(
target, config.security_params().server_host_override(),
- config.has_security_params(),
- !config.security_params().use_test_ca());
+ config.has_security_params(), !config.security_params().use_test_ca(),
+ std::shared_ptr<CallCredentials>(), args);
+ gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
+ GPR_ASSERT(channel_->WaitForConnected(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(30, GPR_TIMESPAN))));
stub_ = create_stub(channel_);
}
Channel* get_channel() { return channel_.get(); }
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index e72cef2811..6ad4c320b5 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -42,7 +42,6 @@
#include <thread>
#include <vector>
-#include <gflags/gflags.h>
#include <grpc++/alarm.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
@@ -84,7 +83,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
- CompletionQueue*)> start_req,
+ CompletionQueue*)>
+ start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
@@ -165,7 +165,8 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
AsyncClient(const ClientConfig& config,
std::function<ClientRpcContext*(
StubType*, std::function<gpr_timespec()> next_issue,
- const RequestType&)> setup_ctx,
+ const RequestType&)>
+ setup_ctx,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub)
: ClientImpl<StubType, RequestType>(config, create_stub),
@@ -278,7 +279,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
- void*)> start_req,
+ void*)>
+ start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
@@ -405,7 +407,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*,
- const grpc::string& method_name, CompletionQueue*, void*)> start_req,
+ const grpc::string& method_name, CompletionQueue*, void*)>
+ start_req,
std::function<void(grpc::Status, ByteBuffer*)> on_done)
: context_(),
stub_(stub),
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index fb161f70ee..c88e95b80e 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -40,7 +40,6 @@
#include <thread>
#include <vector>
-#include <gflags/gflags.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/server.h>
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 2583ceb819..83dbdc3e59 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -43,7 +43,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
-#include <gtest/gtest.h>
#include "src/core/lib/support/env.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
@@ -83,6 +82,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
auto stub = WorkerService::NewStub(
CreateChannel(*it, InsecureChannelCredentials()));
grpc::ClientContext ctx;
+ ctx.set_fail_fast(false);
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
@@ -166,6 +166,7 @@ namespace runsc {
static ClientContext* AllocContext(list<ClientContext>* contexts) {
contexts->emplace_back();
auto context = &contexts->back();
+ context->set_fail_fast(false);
return context;
}
@@ -243,8 +244,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* servers = new ServerData[num_servers];
for (size_t i = 0; i < num_servers; i++) {
- gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", workers[i].c_str(),
- i);
+ gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
+ workers[i].c_str(), i);
servers[i].stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureChannelCredentials()));
@@ -306,8 +307,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
const auto& worker = workers[i + num_servers];
- gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", worker.c_str(),
- i + num_servers);
+ gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
+ worker.c_str(), i + num_servers);
clients[i].stub = WorkerService::NewStub(
CreateChannel(worker, InsecureChannelCredentials()));
ClientConfig per_client_config = client_config;
@@ -435,6 +436,7 @@ void RunQuit() {
CreateChannel(workers[i], InsecureChannelCredentials()));
Void dummy;
grpc::ClientContext ctx;
+ ctx.set_fail_fast(false);
GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
}
}
diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py
index 9d6bf2ab73..34b8151441 100755
--- a/test/cpp/qps/gen_build_yaml.py
+++ b/test/cpp/qps/gen_build_yaml.py
@@ -43,21 +43,46 @@ sys.path.append(run_tests_root)
import performance.scenario_config as scenario_config
+def _scenario_json_string(scenario_json):
+ # tweak parameters to get fast test times
+ scenario_json['warmup_seconds'] = 1
+ scenario_json['benchmark_seconds'] = 1
+ return json.dumps(scenario_config.remove_nonproto_fields(scenario_json))
+
+def threads_of_type(scenario_json, path):
+ d = scenario_json
+ for el in path.split('/'):
+ if el not in d:
+ return 0
+ d = d[el]
+ return d
+
+def guess_cpu(scenario_json):
+ client = threads_of_type(scenario_json, 'client_config/async_client_threads')
+ server = threads_of_type(scenario_json, 'server_config/async_server_threads')
+ # make an arbitrary guess if set to auto-detect
+ # about the size of the jenkins instances we have for unit tests
+ if client == 0: client = 8
+ if server == 0: server = 8
+ return (scenario_json['num_clients'] * client +
+ scenario_json['num_servers'] * server)
+
print yaml.dump({
'tests': [
{
'name': 'json_run_localhost',
- 'shortname': 'json_run_localhost:%s' % js['name'],
- 'args': ['--scenario_json', pipes.quote(json.dumps(js))],
+ 'shortname': 'json_run_localhost:%s' % scenario_json['name'],
+ 'args': ['--scenario_json',
+ pipes.quote(_scenario_json_string(scenario_json))],
'ci_platforms': ['linux', 'mac', 'posix', 'windows'],
'platforms': ['linux', 'mac', 'posix', 'windows'],
'flaky': False,
'language': 'c++',
'boringssl': True,
'defaults': 'boringssl',
- 'cpu_cost': 1000.0,
+ 'cpu_cost': guess_cpu(scenario_json),
'exclude_configs': []
}
- for js in scenario_config.CXXLanguage().scenarios()
+ for scenario_json in scenario_config.CXXLanguage().scenarios()
]
})
diff --git a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc
deleted file mode 100644
index ea373ece84..0000000000
--- a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <set>
-
-#include <grpc/support/log.h>
-
-#include "test/cpp/qps/driver.h"
-#include "test/cpp/qps/report.h"
-#include "test/cpp/util/benchmark_config.h"
-
-namespace grpc {
-namespace testing {
-
-static const int WARMUP = 5;
-static const int BENCHMARK = 5;
-
-static void RunGenericAsyncStreamingPingPong() {
- gpr_log(GPR_INFO, "Running Generic Async Streaming Ping Pong");
-
- ClientConfig client_config;
- client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_outstanding_rpcs_per_channel(1);
- client_config.set_client_channels(1);
- client_config.set_async_client_threads(1);
- client_config.set_rpc_type(STREAMING);
- client_config.mutable_load_params()->mutable_closed_loop();
- auto bbuf = client_config.mutable_payload_config()->mutable_bytebuf_params();
- bbuf->set_resp_size(0);
- bbuf->set_req_size(0);
-
- ServerConfig server_config;
- server_config.set_server_type(ASYNC_GENERIC_SERVER);
- server_config.set_async_server_threads(1);
- *server_config.mutable_payload_config() = client_config.payload_config();
-
- const auto result =
- RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
-
- GetReporter()->ReportQPS(*result);
- GetReporter()->ReportLatency(*result);
-}
-
-} // namespace testing
-} // namespace grpc
-
-int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
-
- grpc::testing::RunGenericAsyncStreamingPingPong();
- return 0;
-}
diff --git a/test/cpp/qps/parse_json.cc b/test/cpp/qps/parse_json.cc
index df7a62f0a0..e4fc35fa41 100644
--- a/test/cpp/qps/parse_json.cc
+++ b/test/cpp/qps/parse_json.cc
@@ -31,8 +31,6 @@
*
*/
-#include <grpc++/support/config_protobuf.h>
-
#include "test/cpp/qps/parse_json.h"
#include <string>
@@ -57,11 +55,26 @@ void ParseJson(const grpc::string& json, const grpc::string& type,
grpc::string errmsg(status.error_message());
gpr_log(GPR_ERROR, "Failed to convert json to binary: errcode=%d msg=%s",
status.error_code(), errmsg.c_str());
- gpr_log(GPR_ERROR, "JSON: ", json.c_str());
+ gpr_log(GPR_ERROR, "JSON: %s", json.c_str());
abort();
}
GPR_ASSERT(msg->ParseFromString(binary));
}
+grpc::string SerializeJson(const GRPC_CUSTOM_MESSAGE& msg,
+ const grpc::string& type) {
+ std::unique_ptr<google::protobuf::util::TypeResolver> type_resolver(
+ google::protobuf::util::NewTypeResolverForDescriptorPool(
+ "type.googleapis.com",
+ google::protobuf::DescriptorPool::generated_pool()));
+ grpc::string binary;
+ grpc::string json_string;
+ msg.SerializeToString(&binary);
+ auto status =
+ BinaryToJsonString(type_resolver.get(), type, binary, &json_string);
+ GPR_ASSERT(status.ok());
+ return json_string;
+}
+
} // testing
} // grpc
diff --git a/test/cpp/qps/parse_json.h b/test/cpp/qps/parse_json.h
index 4b8ca79f21..ce1821f961 100644
--- a/test/cpp/qps/parse_json.h
+++ b/test/cpp/qps/parse_json.h
@@ -34,8 +34,8 @@
#ifndef TEST_QPS_PARSE_JSON_H
#define TEST_QPS_PARSE_JSON_H
+#include <grpc++/impl/codegen/config_protobuf.h>
#include <grpc++/support/config.h>
-#include <grpc++/support/config_protobuf.h>
namespace grpc {
namespace testing {
@@ -43,6 +43,9 @@ namespace testing {
void ParseJson(const grpc::string& json, const grpc::string& type,
GRPC_CUSTOM_MESSAGE* msg);
+grpc::string SerializeJson(const GRPC_CUSTOM_MESSAGE& msg,
+ const grpc::string& type);
+
} // testing
} // grpc
diff --git a/test/cpp/qps/perf_db_client.cc b/test/cpp/qps/perf_db_client.cc
deleted file mode 100644
index 98efd8c3e3..0000000000
--- a/test/cpp/qps/perf_db_client.cc
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "test/cpp/qps/perf_db_client.h"
-
-namespace grpc {
-namespace testing {
-
-// sets the client and server config information
-void PerfDbClient::setConfigs(const ClientConfig& client_config,
- const ServerConfig& server_config) {
- client_config_ = client_config;
- server_config_ = server_config;
-}
-
-// sets the QPS
-void PerfDbClient::setQps(double qps) { qps_ = qps; }
-
-// sets the QPS per core
-void PerfDbClient::setQpsPerCore(double qps_per_core) {
- qps_per_core_ = qps_per_core;
-}
-
-// sets the 50th, 90th, 95th, 99th and 99.9th percentile latency
-void PerfDbClient::setLatencies(double perc_lat_50, double perc_lat_90,
- double perc_lat_95, double perc_lat_99,
- double perc_lat_99_point_9) {
- perc_lat_50_ = perc_lat_50;
- perc_lat_90_ = perc_lat_90;
- perc_lat_95_ = perc_lat_95;
- perc_lat_99_ = perc_lat_99;
- perc_lat_99_point_9_ = perc_lat_99_point_9;
-}
-
-// sets the server and client, user and system times
-void PerfDbClient::setTimes(double server_system_time, double server_user_time,
- double client_system_time,
- double client_user_time) {
- server_system_time_ = server_system_time;
- server_user_time_ = server_user_time;
- client_system_time_ = client_system_time;
- client_user_time_ = client_user_time;
-}
-
-// sends the data to the performance database server
-bool PerfDbClient::sendData(std::string hashed_id, std::string test_name,
- std::string sys_info, std::string tag) {
- // Data record request object
- SingleUserRecordRequest single_user_record_request;
-
- // setting access token, name of the test and the system information
- single_user_record_request.set_hashed_id(hashed_id);
- single_user_record_request.set_test_name(test_name);
- single_user_record_request.set_sys_info(sys_info);
- single_user_record_request.set_tag(tag);
-
- // setting configs
- *(single_user_record_request.mutable_client_config()) = client_config_;
- *(single_user_record_request.mutable_server_config()) = server_config_;
-
- Metrics* metrics = single_user_record_request.mutable_metrics();
-
- // setting metrcs in data record request
- if (qps_ != DBL_MIN) {
- metrics->set_qps(qps_);
- }
- if (qps_per_core_ != DBL_MIN) {
- metrics->set_qps_per_core(qps_per_core_);
- }
- if (perc_lat_50_ != DBL_MIN) {
- metrics->set_perc_lat_50(perc_lat_50_);
- }
- if (perc_lat_90_ != DBL_MIN) {
- metrics->set_perc_lat_90(perc_lat_90_);
- }
- if (perc_lat_95_ != DBL_MIN) {
- metrics->set_perc_lat_95(perc_lat_95_);
- }
- if (perc_lat_99_ != DBL_MIN) {
- metrics->set_perc_lat_99(perc_lat_99_);
- }
- if (perc_lat_99_point_9_ != DBL_MIN) {
- metrics->set_perc_lat_99_point_9(perc_lat_99_point_9_);
- }
- if (server_system_time_ != DBL_MIN) {
- metrics->set_server_system_time(server_system_time_);
- }
- if (server_user_time_ != DBL_MIN) {
- metrics->set_server_user_time(server_user_time_);
- }
- if (client_system_time_ != DBL_MIN) {
- metrics->set_client_system_time(client_system_time_);
- }
- if (client_user_time_ != DBL_MIN) {
- metrics->set_client_user_time(client_user_time_);
- }
-
- SingleUserRecordReply single_user_record_reply;
- ClientContext context;
-
- Status status = stub_->RecordSingleClientData(
- &context, single_user_record_request, &single_user_record_reply);
- if (status.ok()) {
- return true; // data sent to database successfully
- } else {
- return false; // error in data sending
- }
-}
-} // testing
-} // grpc
diff --git a/test/cpp/qps/perf_db_client.h b/test/cpp/qps/perf_db_client.h
deleted file mode 100644
index b74c70d86b..0000000000
--- a/test/cpp/qps/perf_db_client.h
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <cfloat>
-#include <iostream>
-#include <memory>
-#include <string>
-
-#include <grpc++/channel.h>
-#include <grpc++/client_context.h>
-#include <grpc++/create_channel.h>
-#include <grpc++/security/credentials.h>
-#include <grpc++/support/channel_arguments.h>
-#include <grpc/grpc.h>
-#include "src/proto/grpc/testing/perf_db.grpc.pb.h"
-
-namespace grpc {
-namespace testing {
-
-// Manages data sending to performance database server
-class PerfDbClient {
- public:
- PerfDbClient() {
- qps_ = DBL_MIN;
- qps_per_core_ = DBL_MIN;
- perc_lat_50_ = DBL_MIN;
- perc_lat_90_ = DBL_MIN;
- perc_lat_95_ = DBL_MIN;
- perc_lat_99_ = DBL_MIN;
- perc_lat_99_point_9_ = DBL_MIN;
- server_system_time_ = DBL_MIN;
- server_user_time_ = DBL_MIN;
- client_system_time_ = DBL_MIN;
- client_user_time_ = DBL_MIN;
- }
-
- void init(std::shared_ptr<Channel> channel) {
- stub_ = PerfDbTransfer::NewStub(channel);
- }
-
- ~PerfDbClient() {}
-
- // sets the client and server config information
- void setConfigs(const ClientConfig& client_config,
- const ServerConfig& server_config);
-
- // sets the qps
- void setQps(double qps);
-
- // sets the qps per core
- void setQpsPerCore(double qps_per_core);
-
- // sets the 50th, 90th, 95th, 99th and 99.9th percentile latency
- void setLatencies(double perc_lat_50, double perc_lat_90, double perc_lat_95,
- double perc_lat_99, double perc_lat_99_point_9);
-
- // sets the server and client, user and system times
- void setTimes(double server_system_time, double server_user_time,
- double client_system_time, double client_user_time);
-
- // sends the data to the performance database server
- bool sendData(std::string hashed_id, std::string test_name,
- std::string sys_info, std::string tag);
-
- private:
- std::unique_ptr<PerfDbTransfer::Stub> stub_;
- ClientConfig client_config_;
- ServerConfig server_config_;
- double qps_;
- double qps_per_core_;
- double perc_lat_50_;
- double perc_lat_90_;
- double perc_lat_95_;
- double perc_lat_99_;
- double perc_lat_99_point_9_;
- double server_system_time_;
- double server_user_time_;
- double client_system_time_;
- double client_user_time_;
-};
-
-} // namespace testing
-} // namespace grpc
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index d7642f0e1e..f5d739f893 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -34,7 +34,7 @@
#include <memory>
#include <set>
-#include <grpc++/support/config_protobuf.h>
+#include <grpc++/impl/codegen/config_protobuf.h>
#include <gflags/gflags.h>
#include <grpc/support/log.h>
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index c3e72d9b17..f94ea0cb49 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -50,8 +50,8 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_outstanding_rpcs_per_channel(1000);
- client_config.set_client_channels(8);
+ client_config.set_outstanding_rpcs_per_channel(100);
+ client_config.set_client_channels(64);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed_loop();
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index 3ae41399cf..2ec7d8676c 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -35,11 +35,9 @@
#include <fstream>
-#include <google/protobuf/util/json_util.h>
-#include <google/protobuf/util/type_resolver_util.h>
-
#include <grpc/support/log.h>
#include "test/cpp/qps/driver.h"
+#include "test/cpp/qps/parse_json.h"
#include "test/cpp/qps/stats.h"
namespace grpc {
@@ -104,18 +102,8 @@ void GprLogReporter::ReportTimes(const ScenarioResult& result) {
}
void JsonReporter::ReportQPS(const ScenarioResult& result) {
- std::unique_ptr<google::protobuf::util::TypeResolver> type_resolver(
- google::protobuf::util::NewTypeResolverForDescriptorPool(
- "type.googleapis.com",
- google::protobuf::DescriptorPool::generated_pool()));
- grpc::string binary;
- grpc::string json_string;
- result.SerializeToString(&binary);
- auto status = BinaryToJsonString(
- type_resolver.get(), "type.googleapis.com/grpc.testing.ScenarioResult",
- binary, &json_string);
- GPR_ASSERT(status.ok());
-
+ grpc::string json_string =
+ SerializeJson(result, "type.googleapis.com/grpc.testing.ScenarioResult");
std::ofstream output_file(report_file_);
output_file << json_string;
output_file.close();
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index 8f04d84124..39cf498e7b 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -41,7 +41,6 @@
#include <grpc++/support/config.h>
#include "test/cpp/qps/driver.h"
-#include "test/cpp/qps/perf_db_client.h"
namespace grpc {
namespace testing {
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index a68f1ae7b6..c9954d0d02 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -37,7 +37,6 @@
#include <mutex>
#include <thread>
-#include <gflags/gflags.h>
#include <grpc++/generic/async_generic_service.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
@@ -48,7 +47,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
-#include <gtest/gtest.h>
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/core/util/test_config.h"
@@ -73,7 +71,8 @@ class AsyncQpsServerTest : public Server {
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_function,
std::function<grpc::Status(const PayloadConfig &, const RequestType *,
- ResponseType *)> process_rpc)
+ ResponseType *)>
+ process_rpc)
: Server(config) {
char *server_address = NULL;
@@ -130,10 +129,10 @@ class AsyncQpsServerTest : public Server {
}
}
~AsyncQpsServerTest() {
- server_->Shutdown();
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
(*ss)->set_shutdown();
}
+ server_->Shutdown();
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
@@ -190,7 +189,8 @@ class AsyncQpsServerTest : public Server {
ServerRpcContextUnaryImpl(
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncResponseWriter<ResponseType> *,
- void *)> request_method,
+ void *)>
+ request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index 9e64f470bf..c774985bfa 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -33,7 +33,6 @@
#include <thread>
-#include <gflags/gflags.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
diff --git a/test/cpp/qps/sync_streaming_ping_pong_test.cc b/test/cpp/qps/sync_streaming_ping_pong_test.cc
deleted file mode 100644
index 67c62f4bae..0000000000
--- a/test/cpp/qps/sync_streaming_ping_pong_test.cc
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <set>
-
-#include <grpc/support/log.h>
-
-#include "test/cpp/qps/driver.h"
-#include "test/cpp/qps/report.h"
-#include "test/cpp/util/benchmark_config.h"
-
-namespace grpc {
-namespace testing {
-
-static const int WARMUP = 5;
-static const int BENCHMARK = 5;
-
-static void RunSynchronousStreamingPingPong() {
- gpr_log(GPR_INFO, "Running Synchronous Streaming Ping Pong");
-
- ClientConfig client_config;
- client_config.set_client_type(SYNC_CLIENT);
- client_config.set_outstanding_rpcs_per_channel(1);
- client_config.set_client_channels(1);
- client_config.set_rpc_type(STREAMING);
- client_config.mutable_load_params()->mutable_closed_loop();
-
- ServerConfig server_config;
- server_config.set_server_type(SYNC_SERVER);
-
- const auto result =
- RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
-
- GetReporter()->ReportQPS(*result);
- GetReporter()->ReportLatency(*result);
-}
-} // namespace testing
-} // namespace grpc
-
-int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
-
- grpc::testing::RunSynchronousStreamingPingPong();
-
- return 0;
-}
diff --git a/test/cpp/qps/sync_unary_ping_pong_test.cc b/test/cpp/qps/sync_unary_ping_pong_test.cc
deleted file mode 100644
index aa0c0c3013..0000000000
--- a/test/cpp/qps/sync_unary_ping_pong_test.cc
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <set>
-
-#include <grpc/support/log.h>
-
-#include "test/cpp/qps/driver.h"
-#include "test/cpp/qps/report.h"
-#include "test/cpp/util/benchmark_config.h"
-
-namespace grpc {
-namespace testing {
-
-static const int WARMUP = 5;
-static const int BENCHMARK = 5;
-
-static void RunSynchronousUnaryPingPong() {
- gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
-
- ClientConfig client_config;
- client_config.set_client_type(SYNC_CLIENT);
- client_config.set_outstanding_rpcs_per_channel(1);
- client_config.set_client_channels(1);
- client_config.set_rpc_type(UNARY);
- client_config.mutable_load_params()->mutable_closed_loop();
-
- ServerConfig server_config;
- server_config.set_server_type(SYNC_SERVER);
-
- const auto result =
- RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
-
- GetReporter()->ReportQPS(*result);
- GetReporter()->ReportLatency(*result);
-}
-
-} // namespace testing
-} // namespace grpc
-
-int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
-
- grpc::testing::RunSynchronousUnaryPingPong();
-
- return 0;
-}
diff --git a/test/cpp/util/byte_buffer_proto_helper.h b/test/cpp/util/byte_buffer_proto_helper.h
index 42cea59e33..007723c691 100644
--- a/test/cpp/util/byte_buffer_proto_helper.h
+++ b/test/cpp/util/byte_buffer_proto_helper.h
@@ -36,8 +36,8 @@
#include <memory>
+#include <grpc++/impl/codegen/config_protobuf.h>
#include <grpc++/support/byte_buffer.h>
-#include <grpc++/support/config_protobuf.h>
namespace grpc {
namespace testing {
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc
index 99fad7f2fe..98b9d930d6 100644
--- a/test/cpp/util/cli_call.cc
+++ b/test/cpp/util/cli_call.cc
@@ -86,7 +86,6 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
cq.Next(&got_tag, &ok);
if (!ok) {
std::cout << "Failed to read response." << std::endl;
- return Status(StatusCode::INTERNAL, "Failed to read response");
}
grpc::Status status;
call->Finish(&status, tag(5));
@@ -103,6 +102,7 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
slices[i].size());
}
}
+
*server_initial_metadata = ctx.GetServerInitialMetadata();
*server_trailing_metadata = ctx.GetServerTrailingMetadata();
return status;
diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc
index 68cf4114a8..c52e48bae6 100644
--- a/test/cpp/util/grpc_cli.cc
+++ b/test/cpp/util/grpc_cli.cc
@@ -32,32 +32,33 @@
*/
/*
- A command line tool to talk to any grpc server.
+ A command line tool to talk to a grpc server.
Example of talking to grpc interop server:
- 1. Prepare request binary file:
- a. create a text file input.txt, containing the following:
- response_size: 10
- payload: {
- body: "hello world"
- }
- b. under grpc/ run
- protoc --proto_path=src/proto/grpc/testing/ \
- --encode=grpc.testing.SimpleRequest
- src/proto/grpc/testing/messages.proto \
- < input.txt > input.bin
- 2. Start a server
- make interop_server && bins/opt/interop_server --port=50051
- 3. Run the tool
- make grpc_cli && bins/opt/grpc_cli call localhost:50051 \
- /grpc.testing.TestService/UnaryCall --enable_ssl=false \
- --input_binary_file=input.bin --output_binary_file=output.bin
- 4. Decode response
- protoc --proto_path=src/proto/grpc/testing/ \
- --decode=grpc.testing.SimpleResponse src/proto/grpc/testing/messages.proto \
- < output.bin > output.txt
- 5. Now the text form of response should be in output.txt
- Optionally, metadata can be passed to server via flag --metadata, e.g.
- --metadata="MyHeaderKey1:Value1:MyHeaderKey2:Value2"
+ grpc_cli call localhost:50051 UnaryCall src/proto/grpc/testing/test.proto \
+ "response_size:10" --enable_ssl=false
+
+ Options:
+ 1. --proto_path, if your proto file is not under current working directory,
+ use this flag to provide a search root. It should work similar to the
+ counterpart in protoc.
+ 2. --metadata specifies metadata to be sent to the server, such as:
+ --metadata="MyHeaderKey1:Value1:MyHeaderKey2:Value2"
+ 3. --enable_ssl, whether to use tls.
+ 4. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call
+ 3. --input_binary_file, a file containing the serialized request. The file
+ can be generated by calling something like:
+ protoc --proto_path=src/proto/grpc/testing/ \
+ --encode=grpc.testing.SimpleRequest \
+ src/proto/grpc/testing/messages.proto \
+ < input.txt > input.bin
+ If this is used and no proto file is provided in the argument list, the
+ method string has to be exact in the form of /package.service/method.
+ 4. --output_binary_file, a file to write binary format response into, it can
+ be later decoded using protoc:
+ protoc --proto_path=src/proto/grpc/testing/ \
+ --decode=grpc.testing.SimpleResponse \
+ src/proto/grpc/testing/messages.proto \
+ < output.bin > output.txt
*/
#include <fstream>
@@ -72,6 +73,7 @@
#include <grpc/grpc.h>
#include "test/cpp/util/cli_call.h"
+#include "test/cpp/util/proto_file_parser.h"
#include "test/cpp/util/string_ref_helper.h"
#include "test/cpp/util/test_config.h"
@@ -79,10 +81,11 @@ DEFINE_bool(enable_ssl, true, "Whether to use ssl/tls.");
DEFINE_bool(use_auth, false, "Whether to create default google credentials.");
DEFINE_string(input_binary_file, "",
"Path to input file containing serialized request.");
-DEFINE_string(output_binary_file, "output.bin",
+DEFINE_string(output_binary_file, "",
"Path to output file to write serialized response.");
DEFINE_string(metadata, "",
"Metadata to send to server, in the form of key1:val1:key2:val2");
+DEFINE_string(proto_path, ".", "Path to look for the proto file.");
void ParseMetadataFlag(
std::multimap<grpc::string, grpc::string>* client_metadata) {
@@ -126,28 +129,51 @@ void PrintMetadata(const T& m, const grpc::string& message) {
int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
- if (argc < 4 || grpc::string(argv[1]) != "call") {
- std::cout << "Usage: grpc_cli call server_host:port full_method_string\n"
- << "Example: grpc_cli call service.googleapis.com "
- << "/grpc.testing.TestService/UnaryCall "
- << "--input_binary_file=input.bin --output_binary_file=output.bin"
- << std::endl;
+ if (argc < 4 || argc == 5 || grpc::string(argv[1]) != "call") {
+ std::cout << "Usage: grpc_cli call server_host:port method_name "
+ << "[proto file] [text format request] [<options>]" << std::endl;
}
+
+ grpc::string file_name;
+ grpc::string request_text;
grpc::string server_address(argv[2]);
- // TODO(yangg) basic check of method string
- grpc::string method(argv[3]);
+ grpc::string method_name(argv[3]);
+ std::unique_ptr<grpc::testing::ProtoFileParser> parser;
+ grpc::string serialized_request_proto;
- if (FLAGS_input_binary_file.empty()) {
- std::cout << "Missing --input_binary_file for serialized request."
- << std::endl;
+ if (argc == 6) {
+ file_name = argv[4];
+ // TODO(yangg) read from stdin as well?
+ request_text = argv[5];
+ }
+
+ if (request_text.empty() && FLAGS_input_binary_file.empty()) {
+ std::cout << "Missing input. Use text format input or "
+ << "--input_binary_file for serialized request" << std::endl;
return 1;
+ } else if (!request_text.empty()) {
+ parser.reset(new grpc::testing::ProtoFileParser(FLAGS_proto_path, file_name,
+ method_name));
+ method_name = parser->GetFullMethodName();
+ if (parser->HasError()) {
+ return 1;
+ }
}
- std::cout << "connecting to " << server_address << std::endl;
- std::ifstream input_file(FLAGS_input_binary_file,
- std::ios::in | std::ios::binary);
- std::stringstream input_stream;
- input_stream << input_file.rdbuf();
+ if (parser) {
+ serialized_request_proto =
+ parser->GetSerializedProto(request_text, true /* is_request */);
+ if (parser->HasError()) {
+ return 1;
+ }
+ } else if (!FLAGS_input_binary_file.empty()) {
+ std::ifstream input_file(FLAGS_input_binary_file,
+ std::ios::in | std::ios::binary);
+ std::stringstream input_stream;
+ input_stream << input_file.rdbuf();
+ serialized_request_proto = input_stream.str();
+ }
+ std::cout << "connecting to " << server_address << std::endl;
std::shared_ptr<grpc::ChannelCredentials> creds;
if (!FLAGS_enable_ssl) {
@@ -162,25 +188,34 @@ int main(int argc, char** argv) {
std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(server_address, creds);
- grpc::string response;
+ grpc::string serialized_response_proto;
std::multimap<grpc::string, grpc::string> client_metadata;
std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
server_trailing_metadata;
ParseMetadataFlag(&client_metadata);
PrintMetadata(client_metadata, "Sending client initial metadata:");
grpc::Status s = grpc::testing::CliCall::Call(
- channel, method, input_stream.str(), &response, client_metadata,
- &server_initial_metadata, &server_trailing_metadata);
+ channel, method_name, serialized_request_proto,
+ &serialized_response_proto, client_metadata, &server_initial_metadata,
+ &server_trailing_metadata);
PrintMetadata(server_initial_metadata,
"Received initial metadata from server:");
PrintMetadata(server_trailing_metadata,
"Received trailing metadata from server:");
if (s.ok()) {
std::cout << "Rpc succeeded with OK status" << std::endl;
- if (!response.empty()) {
+ if (parser) {
+ grpc::string response_text = parser->GetTextFormat(
+ serialized_response_proto, false /* is_request */);
+ if (parser->HasError()) {
+ return 1;
+ }
+ std::cout << "Response: \n " << response_text << std::endl;
+ }
+ if (!FLAGS_output_binary_file.empty()) {
std::ofstream output_file(FLAGS_output_binary_file,
std::ios::trunc | std::ios::binary);
- output_file << response;
+ output_file << serialized_response_proto;
}
} else {
std::cout << "Rpc failed with status code " << s.error_code()
diff --git a/test/cpp/util/metrics_server.cc b/test/cpp/util/metrics_server.cc
index d9b44a6a92..cc6b39b753 100644
--- a/test/cpp/util/metrics_server.cc
+++ b/test/cpp/util/metrics_server.cc
@@ -42,16 +42,26 @@
namespace grpc {
namespace testing {
-Gauge::Gauge(long initial_val) : val_(initial_val) {}
+QpsGauge::QpsGauge()
+ : start_time_(gpr_now(GPR_CLOCK_REALTIME)), num_queries_(0) {}
-void Gauge::Set(long new_val) {
- std::lock_guard<std::mutex> lock(val_mu_);
- val_ = new_val;
+void QpsGauge::Reset() {
+ std::lock_guard<std::mutex> lock(num_queries_mu_);
+ num_queries_ = 0;
+ start_time_ = gpr_now(GPR_CLOCK_REALTIME);
}
-long Gauge::Get() {
- std::lock_guard<std::mutex> lock(val_mu_);
- return val_;
+void QpsGauge::Incr() {
+ std::lock_guard<std::mutex> lock(num_queries_mu_);
+ num_queries_++;
+}
+
+long QpsGauge::Get() {
+ std::lock_guard<std::mutex> lock(num_queries_mu_);
+ gpr_timespec time_diff =
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time_);
+ long duration_secs = time_diff.tv_sec > 0 ? time_diff.tv_sec : 1;
+ return num_queries_ / duration_secs;
}
grpc::Status MetricsServiceImpl::GetAllGauges(
@@ -60,7 +70,7 @@ grpc::Status MetricsServiceImpl::GetAllGauges(
gpr_log(GPR_DEBUG, "GetAllGauges called");
std::lock_guard<std::mutex> lock(mu_);
- for (auto it = gauges_.begin(); it != gauges_.end(); it++) {
+ for (auto it = qps_gauges_.begin(); it != qps_gauges_.end(); it++) {
GaugeResponse resp;
resp.set_name(it->first); // Gauge name
resp.set_long_value(it->second->Get()); // Gauge value
@@ -75,8 +85,8 @@ grpc::Status MetricsServiceImpl::GetGauge(ServerContext* context,
GaugeResponse* response) {
std::lock_guard<std::mutex> lock(mu_);
- const auto it = gauges_.find(request->name());
- if (it != gauges_.end()) {
+ const auto it = qps_gauges_.find(request->name());
+ if (it != qps_gauges_.end()) {
response->set_name(it->first);
response->set_long_value(it->second->Get());
}
@@ -84,16 +94,17 @@ grpc::Status MetricsServiceImpl::GetGauge(ServerContext* context,
return Status::OK;
}
-std::shared_ptr<Gauge> MetricsServiceImpl::CreateGauge(const grpc::string& name,
- bool* already_present) {
+std::shared_ptr<QpsGauge> MetricsServiceImpl::CreateQpsGauge(
+ const grpc::string& name, bool* already_present) {
std::lock_guard<std::mutex> lock(mu_);
- std::shared_ptr<Gauge> gauge(new Gauge(0));
- const auto p = gauges_.emplace(name, gauge);
+ std::shared_ptr<QpsGauge> qps_gauge(new QpsGauge());
+ const auto p = qps_gauges_.emplace(name, qps_gauge);
- // p.first is an iterator pointing to <name, shared_ptr<Gauge>> pair. p.second
- // is a boolean which is set to 'true' if the Gauge is inserted in the guages_
- // map and 'false' if it is already present in the map
+ // p.first is an iterator pointing to <name, shared_ptr<QpsGauge>> pair.
+ // p.second is a boolean which is set to 'true' if the QpsGauge is
+ // successfully inserted in the guages_ map and 'false' if it is already
+ // present in the map
*already_present = !p.second;
return p.first->second;
}
diff --git a/test/cpp/util/metrics_server.h b/test/cpp/util/metrics_server.h
index ce05e0be64..aa9bfed23d 100644
--- a/test/cpp/util/metrics_server.h
+++ b/test/cpp/util/metrics_server.h
@@ -48,10 +48,13 @@
* Example:
* MetricsServiceImpl metricsImpl;
* ..
- * // Create Gauge(s). Note: Gauges can be created even after calling
+ * // Create QpsGauge(s). Note: QpsGauges can be created even after calling
* // 'StartServer'.
- * Gauge gauge1 = metricsImpl.CreateGauge("foo",is_present);
- * // gauge1 can now be used anywhere in the program to set values.
+ * QpsGauge qps_gauge1 = metricsImpl.CreateQpsGauge("foo", is_present);
+ * // qps_gauge1 can now be used anywhere in the program by first making a
+ * // one-time call qps_gauge1.Reset() and then calling qps_gauge1.Incr()
+ * // every time to increment a query counter
+ *
* ...
* // Create the metrics server
* std::unique_ptr<grpc::Server> server = metricsImpl.StartServer(port);
@@ -60,17 +63,24 @@
namespace grpc {
namespace testing {
-// TODO(sreek): Add support for other types of Gauges like Double, String in
-// future
-class Gauge {
+class QpsGauge {
public:
- Gauge(long initial_val);
- void Set(long new_val);
+ QpsGauge();
+
+ // Initialize the internal timer and reset the query count to 0
+ void Reset();
+
+ // Increment the query count by 1
+ void Incr();
+
+ // Return the current qps (i.e query count divided by the time since this
+ // QpsGauge object created (or Reset() was called))
long Get();
private:
- long val_;
- std::mutex val_mu_;
+ gpr_timespec start_time_;
+ long num_queries_;
+ std::mutex num_queries_mu_;
};
class MetricsServiceImpl GRPC_FINAL : public MetricsService::Service {
@@ -81,17 +91,17 @@ class MetricsServiceImpl GRPC_FINAL : public MetricsService::Service {
grpc::Status GetGauge(ServerContext* context, const GaugeRequest* request,
GaugeResponse* response) GRPC_OVERRIDE;
- // Create a Gauge with name 'name'. is_present is set to true if the Gauge
+ // Create a QpsGauge with name 'name'. is_present is set to true if the Gauge
// is already present in the map.
- // NOTE: CreateGauge can be called anytime (i.e before or after calling
+ // NOTE: CreateQpsGauge can be called anytime (i.e before or after calling
// StartServer).
- std::shared_ptr<Gauge> CreateGauge(const grpc::string& name,
- bool* already_present);
+ std::shared_ptr<QpsGauge> CreateQpsGauge(const grpc::string& name,
+ bool* already_present);
std::unique_ptr<grpc::Server> StartServer(int port);
private:
- std::map<string, std::shared_ptr<Gauge>> gauges_;
+ std::map<string, std::shared_ptr<QpsGauge>> qps_gauges_;
std::mutex mu_;
};
diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc
new file mode 100644
index 0000000000..25aec329eb
--- /dev/null
+++ b/test/cpp/util/proto_file_parser.cc
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/cpp/util/proto_file_parser.h"
+
+#include <algorithm>
+#include <iostream>
+#include <sstream>
+
+#include <google/protobuf/text_format.h>
+#include <grpc++/support/config.h>
+
+namespace grpc {
+namespace testing {
+namespace {
+
+// Match the user input method string to the full_name from method descriptor.
+bool MethodNameMatch(const grpc::string& full_name, const grpc::string& input) {
+ grpc::string clean_input = input;
+ std::replace(clean_input.begin(), clean_input.end(), '/', '.');
+ if (clean_input.size() > full_name.size()) {
+ return false;
+ }
+ return full_name.compare(full_name.size() - clean_input.size(),
+ clean_input.size(), clean_input) == 0;
+}
+} // namespace
+
+class ErrorPrinter
+ : public google::protobuf::compiler::MultiFileErrorCollector {
+ public:
+ explicit ErrorPrinter(ProtoFileParser* parser) : parser_(parser) {}
+
+ void AddError(const grpc::string& filename, int line, int column,
+ const grpc::string& message) GRPC_OVERRIDE {
+ std::ostringstream oss;
+ oss << "error " << filename << " " << line << " " << column << " "
+ << message << "\n";
+ parser_->LogError(oss.str());
+ }
+
+ void AddWarning(const grpc::string& filename, int line, int column,
+ const grpc::string& message) GRPC_OVERRIDE {
+ std::cout << "warning " << filename << " " << line << " " << column << " "
+ << message << std::endl;
+ }
+
+ private:
+ ProtoFileParser* parser_; // not owned
+};
+
+ProtoFileParser::ProtoFileParser(const grpc::string& proto_path,
+ const grpc::string& file_name,
+ const grpc::string& method)
+ : has_error_(false) {
+ source_tree_.MapPath("", proto_path);
+ error_printer_.reset(new ErrorPrinter(this));
+ importer_.reset(new google::protobuf::compiler::Importer(
+ &source_tree_, error_printer_.get()));
+ const auto* file_desc = importer_->Import(file_name);
+ if (!file_desc) {
+ LogError("");
+ return;
+ }
+ dynamic_factory_.reset(
+ new google::protobuf::DynamicMessageFactory(importer_->pool()));
+
+ const google::protobuf::MethodDescriptor* method_descriptor = nullptr;
+ for (int i = 0; !method_descriptor && i < file_desc->service_count(); i++) {
+ const auto* service_desc = file_desc->service(i);
+ for (int j = 0; j < service_desc->method_count(); j++) {
+ const auto* method_desc = service_desc->method(j);
+ if (MethodNameMatch(method_desc->full_name(), method)) {
+ if (method_descriptor) {
+ std::ostringstream error_stream("Ambiguous method names: ");
+ error_stream << method_descriptor->full_name() << " ";
+ error_stream << method_desc->full_name();
+ LogError(error_stream.str());
+ }
+ method_descriptor = method_desc;
+ }
+ }
+ }
+ if (!method_descriptor) {
+ LogError("Method name not found");
+ }
+ if (has_error_) {
+ return;
+ }
+ full_method_name_ = method_descriptor->full_name();
+ size_t last_dot = full_method_name_.find_last_of('.');
+ if (last_dot != grpc::string::npos) {
+ full_method_name_[last_dot] = '/';
+ }
+ full_method_name_.insert(full_method_name_.begin(), '/');
+
+ request_prototype_.reset(
+ dynamic_factory_->GetPrototype(method_descriptor->input_type())->New());
+ response_prototype_.reset(
+ dynamic_factory_->GetPrototype(method_descriptor->output_type())->New());
+}
+
+ProtoFileParser::~ProtoFileParser() {}
+
+grpc::string ProtoFileParser::GetSerializedProto(
+ const grpc::string& text_format_proto, bool is_request) {
+ grpc::string serialized;
+ grpc::protobuf::Message* msg =
+ is_request ? request_prototype_.get() : response_prototype_.get();
+ bool ok =
+ google::protobuf::TextFormat::ParseFromString(text_format_proto, msg);
+ if (!ok) {
+ LogError("Failed to parse text format to proto.");
+ return "";
+ }
+ ok = request_prototype_->SerializeToString(&serialized);
+ if (!ok) {
+ LogError("Failed to serialize proto.");
+ return "";
+ }
+ return serialized;
+}
+
+grpc::string ProtoFileParser::GetTextFormat(
+ const grpc::string& serialized_proto, bool is_request) {
+ grpc::protobuf::Message* msg =
+ is_request ? request_prototype_.get() : response_prototype_.get();
+ if (!msg->ParseFromString(serialized_proto)) {
+ LogError("Failed to deserialize proto.");
+ return "";
+ }
+ grpc::string text_format;
+ if (!google::protobuf::TextFormat::PrintToString(*msg, &text_format)) {
+ LogError("Failed to print proto message to text format");
+ return "";
+ }
+ return text_format;
+}
+
+void ProtoFileParser::LogError(const grpc::string& error_msg) {
+ if (!error_msg.empty()) {
+ std::cout << error_msg << std::endl;
+ }
+ has_error_ = true;
+}
+
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h
new file mode 100644
index 0000000000..46cdd66503
--- /dev/null
+++ b/test/cpp/util/proto_file_parser.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_TEST_CPP_UTIL_PROTO_FILE_PARSER_H
+#define GRPC_TEST_CPP_UTIL_PROTO_FILE_PARSER_H
+
+#include <memory>
+
+#include <google/protobuf/compiler/importer.h>
+#include <google/protobuf/dynamic_message.h>
+
+#include "src/compiler/config.h"
+
+namespace grpc {
+namespace testing {
+class ErrorPrinter;
+
+// Find method and associated request/response types.
+class ProtoFileParser {
+ public:
+ // The given proto file_name will be searched in a source tree rooted from
+ // proto_path. The method could be a partial string such as Service.Method or
+ // even just Method. It will log an error if there is ambiguity.
+ ProtoFileParser(const grpc::string& proto_path, const grpc::string& file_name,
+ const grpc::string& method);
+ ~ProtoFileParser();
+
+ grpc::string GetFullMethodName() const { return full_method_name_; }
+
+ grpc::string GetSerializedProto(const grpc::string& text_format_proto,
+ bool is_request);
+
+ grpc::string GetTextFormat(const grpc::string& serialized_proto,
+ bool is_request);
+
+ bool HasError() const { return has_error_; }
+
+ void LogError(const grpc::string& error_msg);
+
+ private:
+ bool has_error_;
+ grpc::string request_text_;
+ grpc::string full_method_name_;
+ google::protobuf::compiler::DiskSourceTree source_tree_;
+ std::unique_ptr<ErrorPrinter> error_printer_;
+ std::unique_ptr<google::protobuf::compiler::Importer> importer_;
+ std::unique_ptr<google::protobuf::DynamicMessageFactory> dynamic_factory_;
+ std::unique_ptr<grpc::protobuf::Message> request_prototype_;
+ std::unique_ptr<grpc::protobuf::Message> response_prototype_;
+};
+
+} // namespace testing
+} // namespace grpc
+
+#endif // GRPC_TEST_CPP_UTIL_PROTO_FILE_PARSER_H
diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc
new file mode 100644
index 0000000000..6907d97bd5
--- /dev/null
+++ b/test/cpp/util/proto_reflection_descriptor_database.cc
@@ -0,0 +1,316 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/cpp/util/proto_reflection_descriptor_database.h"
+
+#include <vector>
+
+#include <grpc/support/log.h>
+
+using grpc::reflection::v1alpha::ServerReflection;
+using grpc::reflection::v1alpha::ServerReflectionRequest;
+using grpc::reflection::v1alpha::ServerReflectionResponse;
+using grpc::reflection::v1alpha::ListServiceResponse;
+using grpc::reflection::v1alpha::ErrorResponse;
+
+namespace grpc {
+
+ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
+ std::unique_ptr<ServerReflection::Stub> stub)
+ : stub_(std::move(stub)) {}
+
+ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
+ std::shared_ptr<grpc::Channel> channel)
+ : stub_(ServerReflection::NewStub(channel)) {}
+
+ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {}
+
+bool ProtoReflectionDescriptorDatabase::FindFileByName(
+ const string& filename, google::protobuf::FileDescriptorProto* output) {
+ if (cached_db_.FindFileByName(filename, output)) {
+ return true;
+ }
+
+ if (known_files_.find(filename) != known_files_.end()) {
+ return false;
+ }
+
+ ServerReflectionRequest request;
+ request.set_file_by_filename(filename);
+ ServerReflectionResponse response;
+
+ DoOneRequest(request, response);
+
+ if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kFileDescriptorResponse) {
+ AddFileFromResponse(response.file_descriptor_response());
+ } else if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kErrorResponse) {
+ const ErrorResponse error = response.error_response();
+ if (error.error_code() == StatusCode::NOT_FOUND) {
+ gpr_log(GPR_INFO, "NOT_FOUND from server for FindFileByName(%s)",
+ filename.c_str());
+ } else {
+ gpr_log(GPR_INFO,
+ "Error on FindFileByName(%s)\n\tError code: %d\n"
+ "\tError Message: %s",
+ filename.c_str(), error.error_code(),
+ error.error_message().c_str());
+ }
+ } else {
+ gpr_log(
+ GPR_INFO,
+ "Error on FindFileByName(%s) response type\n"
+ "\tExpecting: %d\n\tReceived: %d",
+ filename.c_str(),
+ ServerReflectionResponse::MessageResponseCase::kFileDescriptorResponse,
+ response.message_response_case());
+ }
+
+ return cached_db_.FindFileByName(filename, output);
+}
+
+bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol(
+ const string& symbol_name, google::protobuf::FileDescriptorProto* output) {
+ if (cached_db_.FindFileContainingSymbol(symbol_name, output)) {
+ return true;
+ }
+
+ if (missing_symbols_.find(symbol_name) != missing_symbols_.end()) {
+ return false;
+ }
+
+ ServerReflectionRequest request;
+ request.set_file_containing_symbol(symbol_name);
+ ServerReflectionResponse response;
+
+ DoOneRequest(request, response);
+
+ if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kFileDescriptorResponse) {
+ AddFileFromResponse(response.file_descriptor_response());
+ } else if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kErrorResponse) {
+ const ErrorResponse error = response.error_response();
+ if (error.error_code() == StatusCode::NOT_FOUND) {
+ missing_symbols_.insert(symbol_name);
+ gpr_log(GPR_INFO,
+ "NOT_FOUND from server for FindFileContainingSymbol(%s)",
+ symbol_name.c_str());
+ } else {
+ gpr_log(GPR_INFO,
+ "Error on FindFileContainingSymbol(%s)\n"
+ "\tError code: %d\n\tError Message: %s",
+ symbol_name.c_str(), error.error_code(),
+ error.error_message().c_str());
+ }
+ } else {
+ gpr_log(
+ GPR_INFO,
+ "Error on FindFileContainingSymbol(%s) response type\n"
+ "\tExpecting: %d\n\tReceived: %d",
+ symbol_name.c_str(),
+ ServerReflectionResponse::MessageResponseCase::kFileDescriptorResponse,
+ response.message_response_case());
+ }
+ return cached_db_.FindFileContainingSymbol(symbol_name, output);
+}
+
+bool ProtoReflectionDescriptorDatabase::FindFileContainingExtension(
+ const string& containing_type, int field_number,
+ google::protobuf::FileDescriptorProto* output) {
+ if (cached_db_.FindFileContainingExtension(containing_type, field_number,
+ output)) {
+ return true;
+ }
+
+ if (missing_extensions_.find(containing_type) != missing_extensions_.end() &&
+ missing_extensions_[containing_type].find(field_number) !=
+ missing_extensions_[containing_type].end()) {
+ gpr_log(GPR_INFO, "nested map.");
+ return false;
+ }
+
+ ServerReflectionRequest request;
+ request.mutable_file_containing_extension()->set_containing_type(
+ containing_type);
+ request.mutable_file_containing_extension()->set_extension_number(
+ field_number);
+ ServerReflectionResponse response;
+
+ DoOneRequest(request, response);
+
+ if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kFileDescriptorResponse) {
+ AddFileFromResponse(response.file_descriptor_response());
+ } else if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kErrorResponse) {
+ const ErrorResponse error = response.error_response();
+ if (error.error_code() == StatusCode::NOT_FOUND) {
+ if (missing_extensions_.find(containing_type) ==
+ missing_extensions_.end()) {
+ missing_extensions_[containing_type] = {};
+ }
+ missing_extensions_[containing_type].insert(field_number);
+ gpr_log(GPR_INFO,
+ "NOT_FOUND from server for FindFileContainingExtension(%s, %d)",
+ containing_type.c_str(), field_number);
+ } else {
+ gpr_log(GPR_INFO,
+ "Error on FindFileContainingExtension(%s, %d)\n"
+ "\tError code: %d\n\tError Message: %s",
+ containing_type.c_str(), field_number, error.error_code(),
+ error.error_message().c_str());
+ }
+ } else {
+ gpr_log(
+ GPR_INFO,
+ "Error on FindFileContainingExtension(%s, %d) response type\n"
+ "\tExpecting: %d\n\tReceived: %d",
+ containing_type.c_str(), field_number,
+ ServerReflectionResponse::MessageResponseCase::kFileDescriptorResponse,
+ response.message_response_case());
+ }
+
+ return cached_db_.FindFileContainingExtension(containing_type, field_number,
+ output);
+}
+
+bool ProtoReflectionDescriptorDatabase::FindAllExtensionNumbers(
+ const string& extendee_type, std::vector<int>* output) {
+ if (cached_extension_numbers_.find(extendee_type) !=
+ cached_extension_numbers_.end()) {
+ *output = cached_extension_numbers_[extendee_type];
+ return true;
+ }
+
+ ServerReflectionRequest request;
+ request.set_all_extension_numbers_of_type(extendee_type);
+ ServerReflectionResponse response;
+
+ DoOneRequest(request, response);
+
+ if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::
+ kAllExtensionNumbersResponse) {
+ auto number = response.all_extension_numbers_response().extension_number();
+ *output = std::vector<int>(number.begin(), number.end());
+ cached_extension_numbers_[extendee_type] = *output;
+ return true;
+ } else if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kErrorResponse) {
+ const ErrorResponse error = response.error_response();
+ if (error.error_code() == StatusCode::NOT_FOUND) {
+ gpr_log(GPR_INFO, "NOT_FOUND from server for FindAllExtensionNumbers(%s)",
+ extendee_type.c_str());
+ } else {
+ gpr_log(GPR_INFO,
+ "Error on FindAllExtensionNumbersExtension(%s)\n"
+ "\tError code: %d\n\tError Message: %s",
+ extendee_type.c_str(), error.error_code(),
+ error.error_message().c_str());
+ }
+ }
+ return false;
+}
+
+bool ProtoReflectionDescriptorDatabase::GetServices(
+ std::vector<std::string>* output) {
+ ServerReflectionRequest request;
+ request.set_list_services("");
+ ServerReflectionResponse response;
+
+ DoOneRequest(request, response);
+
+ if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kListServicesResponse) {
+ const ListServiceResponse ls_response = response.list_services_response();
+ for (int i = 0; i < ls_response.service_size(); ++i) {
+ (*output).push_back(ls_response.service(i).name());
+ }
+ return true;
+ } else if (response.message_response_case() ==
+ ServerReflectionResponse::MessageResponseCase::kErrorResponse) {
+ const ErrorResponse error = response.error_response();
+ gpr_log(GPR_INFO,
+ "Error on GetServices()\n\tError code: %d\n"
+ "\tError Message: %s",
+ error.error_code(), error.error_message().c_str());
+ } else {
+ gpr_log(
+ GPR_INFO,
+ "Error on GetServices() response type\n\tExpecting: %d\n\tReceived: %d",
+ ServerReflectionResponse::MessageResponseCase::kListServicesResponse,
+ response.message_response_case());
+ }
+ return false;
+}
+
+const google::protobuf::FileDescriptorProto
+ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse(
+ const std::string& byte_fd_proto) {
+ google::protobuf::FileDescriptorProto file_desc_proto;
+ file_desc_proto.ParseFromString(byte_fd_proto);
+ return file_desc_proto;
+}
+
+void ProtoReflectionDescriptorDatabase::AddFileFromResponse(
+ const grpc::reflection::v1alpha::FileDescriptorResponse& response) {
+ for (int i = 0; i < response.file_descriptor_proto_size(); ++i) {
+ const google::protobuf::FileDescriptorProto file_proto =
+ ParseFileDescriptorProtoResponse(response.file_descriptor_proto(i));
+ if (known_files_.find(file_proto.name()) == known_files_.end()) {
+ known_files_.insert(file_proto.name());
+ cached_db_.Add(file_proto);
+ }
+ }
+}
+
+const std::shared_ptr<ProtoReflectionDescriptorDatabase::ClientStream>
+ProtoReflectionDescriptorDatabase::GetStream() {
+ if (stream_ == nullptr) {
+ stream_ = stub_->ServerReflectionInfo(&ctx_);
+ }
+ return stream_;
+}
+
+void ProtoReflectionDescriptorDatabase::DoOneRequest(
+ const ServerReflectionRequest& request,
+ ServerReflectionResponse& response) {
+ stream_mutex_.lock();
+ GetStream()->Write(request);
+ GetStream()->Read(&response);
+ stream_mutex_.unlock();
+}
+
+} // namespace grpc
diff --git a/test/cpp/util/proto_reflection_descriptor_database.h b/test/cpp/util/proto_reflection_descriptor_database.h
new file mode 100644
index 0000000000..99c00675bb
--- /dev/null
+++ b/test/cpp/util/proto_reflection_descriptor_database.h
@@ -0,0 +1,131 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+#ifndef GRPC_TEST_CPP_PROTO_SERVER_REFLECTION_DATABSE_H
+#define GRPC_TEST_CPP_PROTO_SERVER_REFLECTION_DATABSE_H
+
+#include <mutex>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/descriptor_database.h>
+#include <grpc++/ext/reflection.grpc.pb.h>
+#include <grpc++/grpc++.h>
+
+namespace grpc {
+
+// ProtoReflectionDescriptorDatabase takes a stub of ServerReflection and
+// provides the methods defined by DescriptorDatabase interfaces. It can be used
+// to feed a DescriptorPool instance.
+class ProtoReflectionDescriptorDatabase
+ : public google::protobuf::DescriptorDatabase {
+ public:
+ explicit ProtoReflectionDescriptorDatabase(
+ std::unique_ptr<reflection::v1alpha::ServerReflection::Stub> stub);
+
+ explicit ProtoReflectionDescriptorDatabase(
+ std::shared_ptr<grpc::Channel> channel);
+
+ virtual ~ProtoReflectionDescriptorDatabase();
+
+ // The following four methods implement DescriptorDatabase interfaces.
+ //
+ // Find a file by file name. Fills in in *output and returns true if found.
+ // Otherwise, returns false, leaving the contents of *output undefined.
+ bool FindFileByName(const string& filename,
+ google::protobuf::FileDescriptorProto* output)
+ GRPC_OVERRIDE;
+
+ // Find the file that declares the given fully-qualified symbol name.
+ // If found, fills in *output and returns true, otherwise returns false
+ // and leaves *output undefined.
+ bool FindFileContainingSymbol(const string& symbol_name,
+ google::protobuf::FileDescriptorProto* output)
+ GRPC_OVERRIDE;
+
+ // Find the file which defines an extension extending the given message type
+ // with the given field number. If found, fills in *output and returns true,
+ // otherwise returns false and leaves *output undefined. containing_type
+ // must be a fully-qualified type name.
+ bool FindFileContainingExtension(
+ const string& containing_type, int field_number,
+ google::protobuf::FileDescriptorProto* output) GRPC_OVERRIDE;
+
+ // Finds the tag numbers used by all known extensions of
+ // extendee_type, and appends them to output in an undefined
+ // order. This method is best-effort: it's not guaranteed that the
+ // database will find all extensions, and it's not guaranteed that
+ // FindFileContainingExtension will return true on all of the found
+ // numbers. Returns true if the search was successful, otherwise
+ // returns false and leaves output unchanged.
+ bool FindAllExtensionNumbers(const string& extendee_type,
+ std::vector<int>* output) GRPC_OVERRIDE;
+
+ // Provide a list of full names of registered services
+ bool GetServices(std::vector<std::string>* output);
+
+ private:
+ typedef ClientReaderWriter<
+ grpc::reflection::v1alpha::ServerReflectionRequest,
+ grpc::reflection::v1alpha::ServerReflectionResponse>
+ ClientStream;
+
+ const google::protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse(
+ const std::string& byte_fd_proto);
+
+ void AddFileFromResponse(
+ const grpc::reflection::v1alpha::FileDescriptorResponse& response);
+
+ const std::shared_ptr<ClientStream> GetStream();
+
+ void DoOneRequest(
+ const grpc::reflection::v1alpha::ServerReflectionRequest& request,
+ grpc::reflection::v1alpha::ServerReflectionResponse& response);
+
+ std::shared_ptr<ClientStream> stream_;
+ grpc::ClientContext ctx_;
+ std::unique_ptr<grpc::reflection::v1alpha::ServerReflection::Stub> stub_;
+ std::unordered_set<string> known_files_;
+ std::unordered_set<string> missing_symbols_;
+ std::unordered_map<string, std::unordered_set<int>> missing_extensions_;
+ std::unordered_map<string, std::vector<int>> cached_extension_numbers_;
+ std::mutex stream_mutex_;
+
+ google::protobuf::SimpleDescriptorDatabase cached_db_;
+};
+
+} // namespace grpc
+
+#endif // GRPC_TEST_CPP_METRICS_SERVER_H