aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/client/credentials_test.cc3
-rw-r--r--test/cpp/end2end/client_crash_test.cc165
-rw-r--r--test/cpp/end2end/client_crash_test_server.cc94
-rw-r--r--test/cpp/end2end/end2end_test.cc124
-rw-r--r--test/cpp/end2end/server_crash_test.cc166
-rw-r--r--test/cpp/end2end/server_crash_test_client.cc93
-rw-r--r--test/cpp/end2end/thread_stress_test.cc10
-rw-r--r--test/cpp/interop/client_helper.cc6
-rw-r--r--test/cpp/qps/async_streaming_ping_pong_test.cc4
-rw-r--r--test/cpp/qps/async_unary_ping_pong_test.cc4
-rw-r--r--test/cpp/qps/client.h43
-rw-r--r--test/cpp/qps/client_async.cc89
-rw-r--r--test/cpp/qps/client_sync.cc14
-rw-r--r--test/cpp/qps/driver.cc45
-rw-r--r--test/cpp/qps/driver.h15
-rwxr-xr-xtest/cpp/qps/qps-sweep.sh29
-rw-r--r--test/cpp/qps/qps_driver.cc17
-rw-r--r--test/cpp/qps/qps_test.cc4
-rw-r--r--test/cpp/qps/qps_worker.cc24
-rw-r--r--test/cpp/qps/qpstest.proto2
-rw-r--r--test/cpp/qps/report.cc16
-rw-r--r--test/cpp/qps/server_async.cc125
-rw-r--r--test/cpp/qps/server_sync.cc16
-rwxr-xr-xtest/cpp/qps/single_run_localhost.sh28
-rw-r--r--test/cpp/qps/sync_streaming_ping_pong_test.cc4
-rw-r--r--test/cpp/qps/sync_unary_ping_pong_test.cc4
-rw-r--r--test/cpp/qps/worker.cc7
-rw-r--r--test/cpp/util/create_test_channel.cc6
-rw-r--r--test/cpp/util/create_test_channel.h2
-rw-r--r--test/cpp/util/fake_credentials.cc58
-rw-r--r--test/cpp/util/fake_credentials.h51
-rw-r--r--test/cpp/util/grpc_cli.cc2
-rw-r--r--test/cpp/util/messages.proto1
-rw-r--r--test/cpp/util/subprocess.cc59
-rw-r--r--test/cpp/util/subprocess.h61
35 files changed, 1170 insertions, 221 deletions
diff --git a/test/cpp/client/credentials_test.cc b/test/cpp/client/credentials_test.cc
index 6840418989..ee94f455a4 100644
--- a/test/cpp/client/credentials_test.cc
+++ b/test/cpp/client/credentials_test.cc
@@ -46,8 +46,7 @@ class CredentialsTest : public ::testing::Test {
};
TEST_F(CredentialsTest, InvalidServiceAccountCreds) {
- std::unique_ptr<Credentials> bad1 =
- ServiceAccountCredentials("", "", 1);
+ std::shared_ptr<Credentials> bad1 = ServiceAccountCredentials("", "", 1);
EXPECT_EQ(nullptr, bad1.get());
}
diff --git a/test/cpp/end2end/client_crash_test.cc b/test/cpp/end2end/client_crash_test.cc
new file mode 100644
index 0000000000..e86681f75a
--- /dev/null
+++ b/test/cpp/end2end/client_crash_test.cc
@@ -0,0 +1,165 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <thread>
+
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/util/echo_duplicate.grpc.pb.h"
+#include "test/cpp/util/echo.grpc.pb.h"
+#include "src/cpp/server/thread_pool.h"
+#include <grpc++/channel_arguments.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/credentials.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc++/server_credentials.h>
+#include <grpc++/status.h>
+#include <grpc++/stream.h>
+#include <grpc++/time.h>
+#include <gtest/gtest.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+
+#include "test/cpp/util/subprocess.h"
+
+using grpc::cpp::test::util::EchoRequest;
+using grpc::cpp::test::util::EchoResponse;
+using std::chrono::system_clock;
+
+static std::string g_root;
+
+namespace grpc {
+namespace testing {
+
+namespace {
+
+class CrashTest : public ::testing::Test {
+ protected:
+ CrashTest() {}
+
+ std::unique_ptr<grpc::cpp::test::util::TestService::Stub>
+ CreateServerAndStub() {
+ auto port = grpc_pick_unused_port_or_die();
+ std::ostringstream addr_stream;
+ addr_stream << "localhost:" << port;
+ auto addr = addr_stream.str();
+ server_.reset(new SubProcess({
+ g_root + "/client_crash_test_server",
+ "--address=" + addr,
+ }));
+ GPR_ASSERT(server_);
+ return grpc::cpp::test::util::TestService::NewStub(
+ CreateChannel(addr, InsecureCredentials(), ChannelArguments()));
+ }
+
+ void KillServer() {
+ server_.reset();
+ // give some time for the TCP connection to drop
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(1)));
+ }
+
+ private:
+ std::unique_ptr<SubProcess> server_;
+};
+
+TEST_F(CrashTest, KillAfterWrite) {
+ auto stub = CreateServerAndStub();
+
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ auto stream = stub->BidiStream(&context);
+
+ request.set_message("Hello");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ request.set_message("I'm going to kill you");
+ EXPECT_TRUE(stream->Write(request));
+
+ KillServer();
+
+ EXPECT_FALSE(stream->Read(&response));
+
+ EXPECT_FALSE(stream->Finish().IsOk());
+}
+
+TEST_F(CrashTest, KillBeforeWrite) {
+ auto stub = CreateServerAndStub();
+
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ auto stream = stub->BidiStream(&context);
+
+ request.set_message("Hello");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ KillServer();
+
+ request.set_message("You should be dead");
+ EXPECT_FALSE(stream->Write(request));
+ EXPECT_FALSE(stream->Read(&response));
+
+ EXPECT_FALSE(stream->Finish().IsOk());
+}
+
+} // namespace
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ std::string me = argv[0];
+ auto lslash = me.rfind('/');
+ if (lslash != std::string::npos) {
+ g_root = me.substr(0, lslash);
+ } else {
+ g_root = ".";
+ }
+
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/end2end/client_crash_test_server.cc b/test/cpp/end2end/client_crash_test_server.cc
new file mode 100644
index 0000000000..20808a0240
--- /dev/null
+++ b/test/cpp/end2end/client_crash_test_server.cc
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <gflags/gflags.h>
+
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc++/server_credentials.h>
+#include <grpc++/status.h>
+#include "test/cpp/util/echo.grpc.pb.h"
+
+DEFINE_string(address, "", "Address to bind to");
+
+using grpc::cpp::test::util::EchoRequest;
+using grpc::cpp::test::util::EchoResponse;
+
+// In some distros, gflags is in the namespace google, and in some others,
+// in gflags. This hack is enabling us to find both.
+namespace google {}
+namespace gflags {}
+using namespace google;
+using namespace gflags;
+
+namespace grpc {
+namespace testing {
+
+class ServiceImpl GRPC_FINAL : public ::grpc::cpp::test::util::TestService::Service {
+ Status BidiStream(ServerContext* context,
+ ServerReaderWriter<EchoResponse, EchoRequest>* stream)
+ GRPC_OVERRIDE {
+ EchoRequest request;
+ EchoResponse response;
+ while (stream->Read(&request)) {
+ gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
+ response.set_message(request.message());
+ stream->Write(response);
+ }
+ return Status::OK;
+ }
+};
+
+void RunServer() {
+ ServiceImpl service;
+
+ ServerBuilder builder;
+ builder.AddListeningPort(FLAGS_address, grpc::InsecureServerCredentials());
+ builder.RegisterService(&service);
+ std::unique_ptr<Server> server(builder.BuildAndStart());
+ std::cout << "Server listening on " << FLAGS_address << std::endl;
+ server->Wait();
+}
+}
+}
+
+int main(int argc, char** argv) {
+ ParseCommandLineFlags(&argc, &argv, true);
+ grpc::testing::RunServer();
+
+ return 0;
+}
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index f35b16fe55..7a15591b44 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -33,11 +33,13 @@
#include <thread>
+#include "src/core/security/credentials.h"
+#include "src/cpp/server/thread_pool.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/echo_duplicate.grpc.pb.h"
#include "test/cpp/util/echo.grpc.pb.h"
-#include "src/cpp/server/thread_pool.h"
+#include "test/cpp/util/fake_credentials.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
@@ -94,18 +96,30 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
signal_client_ = true;
}
while (!context->IsCancelled()) {
- std::this_thread::sleep_for(std::chrono::microseconds(
- request->param().client_cancel_after_us()));
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(),
+ gpr_time_from_micros(request->param().client_cancel_after_us())));
}
return Status::Cancelled;
} else if (request->has_param() &&
request->param().server_cancel_after_us()) {
- std::this_thread::sleep_for(
- std::chrono::microseconds(request->param().server_cancel_after_us()));
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(),
+ gpr_time_from_micros(request->param().server_cancel_after_us())));
return Status::Cancelled;
} else {
EXPECT_FALSE(context->IsCancelled());
}
+
+ if (request->has_param() && request->param().echo_metadata()) {
+ const std::multimap<grpc::string, grpc::string>& client_metadata =
+ context->client_metadata();
+ for (std::multimap<grpc::string, grpc::string>::const_iterator iter =
+ client_metadata.begin();
+ iter != client_metadata.end(); ++iter) {
+ context->AddTrailingMetadata((*iter).first, (*iter).second);
+ }
+ }
return Status::OK;
}
@@ -180,7 +194,7 @@ class End2endTest : public ::testing::Test {
// Setup server
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(),
- InsecureServerCredentials());
+ FakeTransportSecurityServerCredentials());
builder.RegisterService(&service_);
builder.SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
@@ -192,8 +206,9 @@ class End2endTest : public ::testing::Test {
void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
void ResetStub() {
- std::shared_ptr<ChannelInterface> channel = CreateChannel(
- server_address_.str(), InsecureCredentials(), ChannelArguments());
+ std::shared_ptr<ChannelInterface> channel =
+ CreateChannel(server_address_.str(), FakeTransportSecurityCredentials(),
+ ChannelArguments());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}
@@ -404,8 +419,9 @@ TEST_F(End2endTest, BidiStream) {
// Talk to the two services with the same name but different package names.
// The two stubs are created on the same channel.
TEST_F(End2endTest, DiffPackageServices) {
- std::shared_ptr<ChannelInterface> channel = CreateChannel(
- server_address_.str(), InsecureCredentials(), ChannelArguments());
+ std::shared_ptr<ChannelInterface> channel =
+ CreateChannel(server_address_.str(), FakeTransportSecurityCredentials(),
+ ChannelArguments());
EchoRequest request;
EchoResponse response;
@@ -429,7 +445,7 @@ TEST_F(End2endTest, DiffPackageServices) {
// rpc and stream should fail on bad credentials.
TEST_F(End2endTest, BadCredentials) {
- std::unique_ptr<Credentials> bad_creds = ServiceAccountCredentials("", "", 1);
+ std::shared_ptr<Credentials> bad_creds = ServiceAccountCredentials("", "", 1);
EXPECT_EQ(nullptr, bad_creds.get());
std::shared_ptr<ChannelInterface> channel =
CreateChannel(server_address_.str(), bad_creds, ChannelArguments());
@@ -438,7 +454,7 @@ TEST_F(End2endTest, BadCredentials) {
EchoRequest request;
EchoResponse response;
ClientContext context;
- grpc::string msg("hello");
+ request.set_message("Hello");
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ("", response.message());
@@ -455,7 +471,7 @@ TEST_F(End2endTest, BadCredentials) {
}
void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
- std::this_thread::sleep_for(std::chrono::microseconds(delay_us));
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_micros(delay_us)));
while (!service->signal_client()) {
}
context->TryCancel();
@@ -588,6 +604,88 @@ TEST_F(End2endTest, RpcMaxMessageSize) {
EXPECT_FALSE(s.IsOk());
}
+bool MetadataContains(const std::multimap<grpc::string, grpc::string>& metadata,
+ const grpc::string& key, const grpc::string& value) {
+ int count = 0;
+
+ for (std::multimap<grpc::string, grpc::string>::const_iterator iter =
+ metadata.begin();
+ iter != metadata.end(); ++iter) {
+ if ((*iter).first == key && (*iter).second == value) {
+ count++;
+ }
+ }
+ return count == 1;
+}
+
+TEST_F(End2endTest, SetPerCallCredentials) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ std::shared_ptr<Credentials> creds =
+ IAMCredentials("fake_token", "fake_selector");
+ context.set_credentials(creds);
+ request.set_message("Hello");
+ request.mutable_param()->set_echo_metadata(true);
+
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_EQ(request.message(), response.message());
+ EXPECT_TRUE(s.IsOk());
+ EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
+ GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
+ "fake_token"));
+ EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
+ GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
+ "fake_selector"));
+}
+
+TEST_F(End2endTest, InsecurePerCallCredentials) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ std::shared_ptr<Credentials> creds = InsecureCredentials();
+ context.set_credentials(creds);
+ request.set_message("Hello");
+ request.mutable_param()->set_echo_metadata(true);
+
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_EQ(StatusCode::CANCELLED, s.code());
+ EXPECT_EQ("Failed to set credentials to rpc.", s.details());
+}
+
+TEST_F(End2endTest, OverridePerCallCredentials) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ std::shared_ptr<Credentials> creds1 =
+ IAMCredentials("fake_token1", "fake_selector1");
+ context.set_credentials(creds1);
+ std::shared_ptr<Credentials> creds2 =
+ IAMCredentials("fake_token2", "fake_selector2");
+ context.set_credentials(creds2);
+ request.set_message("Hello");
+ request.mutable_param()->set_echo_metadata(true);
+
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
+ GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
+ "fake_token2"));
+ EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
+ GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
+ "fake_selector2"));
+ EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
+ GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
+ "fake_token1"));
+ EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
+ GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
+ "fake_selector1"));
+ EXPECT_EQ(request.message(), response.message());
+ EXPECT_TRUE(s.IsOk());
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/end2end/server_crash_test.cc b/test/cpp/end2end/server_crash_test.cc
new file mode 100644
index 0000000000..11d73aec7d
--- /dev/null
+++ b/test/cpp/end2end/server_crash_test.cc
@@ -0,0 +1,166 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <thread>
+
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/util/echo_duplicate.grpc.pb.h"
+#include "test/cpp/util/echo.grpc.pb.h"
+#include "src/cpp/server/thread_pool.h"
+#include <grpc++/channel_arguments.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/credentials.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc++/server_credentials.h>
+#include <grpc++/status.h>
+#include <grpc++/stream.h>
+#include <grpc++/time.h>
+#include <gtest/gtest.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+
+#include "test/cpp/util/subprocess.h"
+
+using grpc::cpp::test::util::EchoRequest;
+using grpc::cpp::test::util::EchoResponse;
+using std::chrono::system_clock;
+
+static std::string g_root;
+
+namespace grpc {
+namespace testing {
+
+namespace {
+
+class ServiceImpl GRPC_FINAL : public ::grpc::cpp::test::util::TestService::Service {
+ Status BidiStream(ServerContext* context,
+ ServerReaderWriter<EchoResponse, EchoRequest>* stream)
+ GRPC_OVERRIDE {
+ EchoRequest request;
+ EchoResponse response;
+ while (stream->Read(&request)) {
+ gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
+ response.set_message(request.message());
+ stream->Write(response);
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(1)));
+ }
+ return Status::OK;
+ }
+
+ Status ResponseStream(ServerContext* context, const EchoRequest* request,
+ ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE {
+ EchoResponse response;
+ for (int i = 0;; i++) {
+ std::ostringstream msg;
+ msg << "Hello " << i;
+ response.set_message(msg.str());
+ if (!writer->Write(response)) break;
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(1)));
+ }
+ return Status::OK;
+ }
+};
+
+class CrashTest : public ::testing::Test {
+ protected:
+ CrashTest() {}
+
+ std::unique_ptr<Server>
+ CreateServerAndClient(const std::string& mode) {
+ auto port = grpc_pick_unused_port_or_die();
+ std::ostringstream addr_stream;
+ addr_stream << "localhost:" << port;
+ auto addr = addr_stream.str();
+ client_.reset(new SubProcess({
+ g_root + "/server_crash_test_client",
+ "--address=" + addr,
+ "--mode=" + mode
+ }));
+ GPR_ASSERT(client_);
+
+ ServerBuilder builder;
+ builder.AddListeningPort(addr, grpc::InsecureServerCredentials());
+ builder.RegisterService(&service_);
+ return builder.BuildAndStart();
+ }
+
+ void KillClient() {
+ client_.reset();
+ }
+
+ private:
+ std::unique_ptr<SubProcess> client_;
+ ServiceImpl service_;
+};
+
+TEST_F(CrashTest, ResponseStream) {
+ auto server = CreateServerAndClient("response");
+
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(5)));
+ KillClient();
+ server->Shutdown();
+}
+
+TEST_F(CrashTest, BidiStream) {
+ auto server = CreateServerAndClient("bidi");
+
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(5)));
+ KillClient();
+ server->Shutdown();
+}
+
+} // namespace
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ std::string me = argv[0];
+ auto lslash = me.rfind('/');
+ if (lslash != std::string::npos) {
+ g_root = me.substr(0, lslash);
+ } else {
+ g_root = ".";
+ }
+
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/end2end/server_crash_test_client.cc b/test/cpp/end2end/server_crash_test_client.cc
new file mode 100644
index 0000000000..497ccb4cb2
--- /dev/null
+++ b/test/cpp/end2end/server_crash_test_client.cc
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <gflags/gflags.h>
+
+#include <grpc++/channel_arguments.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/credentials.h>
+#include <grpc++/status.h>
+#include "test/cpp/util/echo.grpc.pb.h"
+
+DEFINE_string(address, "", "Address to connect to");
+DEFINE_string(mode, "", "Test mode to use");
+
+using grpc::cpp::test::util::EchoRequest;
+using grpc::cpp::test::util::EchoResponse;
+
+// In some distros, gflags is in the namespace google, and in some others,
+// in gflags. This hack is enabling us to find both.
+namespace google {}
+namespace gflags {}
+using namespace google;
+using namespace gflags;
+
+int main(int argc, char** argv) {
+ ParseCommandLineFlags(&argc, &argv, true);
+ auto stub = grpc::cpp::test::util::TestService::NewStub(
+ grpc::CreateChannel(FLAGS_address, grpc::InsecureCredentials(), grpc::ChannelArguments()));
+
+ EchoRequest request;
+ EchoResponse response;
+ grpc::ClientContext context;
+
+ if (FLAGS_mode == "bidi") {
+ auto stream = stub->BidiStream(&context);
+ for (int i = 0;; i++) {
+ std::ostringstream msg;
+ msg << "Hello " << i;
+ request.set_message(msg.str());
+ GPR_ASSERT(stream->Write(request));
+ GPR_ASSERT(stream->Read(&response));
+ GPR_ASSERT(response.message() == request.message());
+ }
+ } else if (FLAGS_mode == "response") {
+ EchoRequest request;
+ request.set_message("Hello");
+ auto stream = stub->ResponseStream(&context, request);
+ for (;;) {
+ GPR_ASSERT(stream->Read(&response));
+ }
+ } else {
+ gpr_log(GPR_ERROR, "invalid test mode '%s'", FLAGS_mode.c_str());
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index 12656128c0..310227a29c 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -94,14 +94,16 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
signal_client_ = true;
}
while (!context->IsCancelled()) {
- std::this_thread::sleep_for(std::chrono::microseconds(
- request->param().client_cancel_after_us()));
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(),
+ gpr_time_from_micros(request->param().client_cancel_after_us())));
}
return Status::Cancelled;
} else if (request->has_param() &&
request->param().server_cancel_after_us()) {
- std::this_thread::sleep_for(
- std::chrono::microseconds(request->param().server_cancel_after_us()));
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(),
+ gpr_time_from_micros(request->param().server_cancel_after_us())));
return Status::Cancelled;
} else {
EXPECT_FALSE(context->IsCancelled());
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc
index a1dea383e6..09fd1c8913 100644
--- a/test/cpp/interop/client_helper.cc
+++ b/test/cpp/interop/client_helper.cc
@@ -82,7 +82,7 @@ std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
FLAGS_server_port);
if (test_case == "service_account_creds") {
- std::unique_ptr<Credentials> creds;
+ std::shared_ptr<Credentials> creds;
GPR_ASSERT(FLAGS_enable_ssl);
grpc::string json_key = GetServiceAccountJsonKey();
std::chrono::seconds token_lifetime = std::chrono::hours(1);
@@ -91,13 +91,13 @@ std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
return CreateTestChannel(host_port, FLAGS_server_host_override,
FLAGS_enable_ssl, FLAGS_use_prod_roots, creds);
} else if (test_case == "compute_engine_creds") {
- std::unique_ptr<Credentials> creds;
+ std::shared_ptr<Credentials> creds;
GPR_ASSERT(FLAGS_enable_ssl);
creds = ComputeEngineCredentials();
return CreateTestChannel(host_port, FLAGS_server_host_override,
FLAGS_enable_ssl, FLAGS_use_prod_roots, creds);
} else if (test_case == "jwt_token_creds") {
- std::unique_ptr<Credentials> creds;
+ std::shared_ptr<Credentials> creds;
GPR_ASSERT(FLAGS_enable_ssl);
grpc::string json_key = GetServiceAccountJsonKey();
std::chrono::seconds token_lifetime = std::chrono::hours(1);
diff --git a/test/cpp/qps/async_streaming_ping_pong_test.cc b/test/cpp/qps/async_streaming_ping_pong_test.cc
index a1822b7e15..d4871c0ba1 100644
--- a/test/cpp/qps/async_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/async_streaming_ping_pong_test.cc
@@ -64,8 +64,8 @@ static void RunAsyncStreamingPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPS(result);
- ReportLatency(result);
+ ReportQPS(*result);
+ ReportLatency(*result);
}
} // namespace testing
diff --git a/test/cpp/qps/async_unary_ping_pong_test.cc b/test/cpp/qps/async_unary_ping_pong_test.cc
index 8b037a8656..35f188c986 100644
--- a/test/cpp/qps/async_unary_ping_pong_test.cc
+++ b/test/cpp/qps/async_unary_ping_pong_test.cc
@@ -64,8 +64,8 @@ static void RunAsyncUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPS(result);
- ReportLatency(result);
+ ReportQPS(*result);
+ ReportLatency(*result);
}
} // namespace testing
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 2dc5b3860f..dc3a9f2ac5 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -113,24 +113,26 @@ class Client {
: done_(false),
new_(nullptr),
impl_([this, idx, client]() {
- for (;;) {
- // run the loop body
- bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
- // lock, see if we're done
- std::lock_guard<std::mutex> g(mu_);
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- done_ = true;
- }
- if (done_) {return;}
- // check if we're marking, swap out the histogram if so
- if (new_) {
- new_->Swap(&histogram_);
- new_ = nullptr;
- cv_.notify_one();
- }
+ for (;;) {
+ // run the loop body
+ bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
+ // lock, see if we're done
+ std::lock_guard<std::mutex> g(mu_);
+ if (!thread_still_ok) {
+ gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
+ done_ = true;
}
- }) {}
+ if (done_) {
+ return;
+ }
+ // check if we're marking, swap out the histogram if so
+ if (new_) {
+ new_->Swap(&histogram_);
+ new_ = nullptr;
+ cv_.notify_one();
+ }
+ }
+ }) {}
~Thread() {
{
@@ -168,10 +170,9 @@ class Client {
std::unique_ptr<Timer> timer_;
};
-std::unique_ptr<Client>
- CreateSynchronousUnaryClient(const ClientConfig& args);
-std::unique_ptr<Client>
- CreateSynchronousStreamingClient(const ClientConfig& args);
+std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
+std::unique_ptr<Client> CreateSynchronousStreamingClient(
+ const ClientConfig& args);
std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index e3ab57728d..00bbd8a8a0 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -128,16 +128,16 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
class AsyncClient : public Client {
public:
explicit AsyncClient(const ClientConfig& config,
- std::function<void(CompletionQueue*, TestService::Stub*,
- const SimpleRequest&)> setup_ctx) :
- Client(config) {
+ std::function<void(CompletionQueue*, TestService::Stub*,
+ const SimpleRequest&)> setup_ctx)
+ : Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue);
}
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto channel = channels_.begin(); channel != channels_.end();
- channel++) {
+ channel++) {
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
setup_ctx(cq, channel->get_stub(), request_);
@@ -155,16 +155,19 @@ class AsyncClient : public Client {
}
}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx)
- GRPC_OVERRIDE GRPC_FINAL {
+ bool ThreadFunc(Histogram* histogram,
+ size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok,
- std::chrono::system_clock::now() +
- std::chrono::seconds(1))) {
- case CompletionQueue::SHUTDOWN: return false;
- case CompletionQueue::TIMEOUT: return true;
- case CompletionQueue::GOT_EVENT: break;
+ switch (cli_cqs_[thread_idx]->AsyncNext(
+ &got_tag, &ok,
+ std::chrono::system_clock::now() + std::chrono::seconds(1))) {
+ case CompletionQueue::SHUTDOWN:
+ return false;
+ case CompletionQueue::TIMEOUT:
+ return true;
+ case CompletionQueue::GOT_EVENT:
+ break;
}
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
@@ -177,18 +180,20 @@ class AsyncClient : public Client {
return true;
}
+
private:
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
};
class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
public:
- explicit AsyncUnaryClient(const ClientConfig& config) :
- AsyncClient(config, SetupCtx) {
+ explicit AsyncUnaryClient(const ClientConfig& config)
+ : AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads());
}
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
-private:
+
+ private:
static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
@@ -205,12 +210,11 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
ClientRpcContextStreamingImpl(
- TestService::Stub *stub, const RequestType &req,
- std::function<
- std::unique_ptr<grpc::ClientAsyncReaderWriter<
- RequestType,ResponseType>>(
- TestService::Stub *, grpc::ClientContext *, void *)> start_req,
- std::function<void(grpc::Status, ResponseType *)> on_done)
+ TestService::Stub* stub, const RequestType& req,
+ std::function<std::unique_ptr<
+ grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
+ TestService::Stub*, grpc::ClientContext*, void*)> start_req,
+ std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
req_(req),
@@ -221,7 +225,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
start_(Timer::Now()),
stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {}
~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
- bool RunNextState(bool ok, Histogram *hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
return (this->*next_state_)(ok, hist);
}
void StartNewClone() GRPC_OVERRIDE {
@@ -229,59 +233,58 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
}
private:
- bool ReqSent(bool ok, Histogram *) {
- return StartWrite(ok);
- }
+ bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
bool StartWrite(bool ok) {
if (!ok) {
- return(false);
+ return (false);
}
start_ = Timer::Now();
next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
stream_->Write(req_, ClientRpcContext::tag(this));
return true;
}
- bool WriteDone(bool ok, Histogram *) {
+ bool WriteDone(bool ok, Histogram*) {
if (!ok) {
- return(false);
+ return (false);
}
next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
}
- bool ReadDone(bool ok, Histogram *hist) {
+ bool ReadDone(bool ok, Histogram* hist) {
hist->Add((Timer::Now() - start_) * 1e9);
return StartWrite(ok);
}
grpc::ClientContext context_;
- TestService::Stub *stub_;
+ TestService::Stub* stub_;
RequestType req_;
ResponseType response_;
- bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram *);
- std::function<void(grpc::Status, ResponseType *)> callback_;
- std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
- RequestType,ResponseType>>(
- TestService::Stub *, grpc::ClientContext *, void *)> start_req_;
+ bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
+ std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
+ TestService::Stub*, grpc::ClientContext*, void*)> start_req_;
grpc::Status status_;
double start_;
- std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType,ResponseType>>
- stream_;
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
+ stream_;
};
class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
- explicit AsyncStreamingClient(const ClientConfig &config) :
- AsyncClient(config, SetupCtx) {
+ explicit AsyncStreamingClient(const ClientConfig& config)
+ : AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads());
}
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
-private:
+
+ private:
static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
- const SimpleRequest& req) {
+ const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
- auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
- void *tag) {
+ auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
+ void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 264293561d..c28dc91321 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -99,7 +99,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public:
SynchronousStreamingClient(const ClientConfig& config)
- : SynchronousClient(config), context_(num_threads_), stream_(num_threads_) {
+ : SynchronousClient(config),
+ context_(num_threads_),
+ stream_(num_threads_) {
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
@@ -110,8 +112,8 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
EndThreads();
for (auto stream = stream_.begin(); stream != stream_.end(); stream++) {
if (*stream) {
- (*stream)->WritesDone();
- EXPECT_TRUE((*stream)->Finish().IsOk());
+ (*stream)->WritesDone();
+ EXPECT_TRUE((*stream)->Finish().IsOk());
}
}
}
@@ -119,7 +121,7 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
double start = Timer::Now();
if (stream_[thread_idx]->Write(request_) &&
- stream_[thread_idx]->Read(&responses_[thread_idx])) {
+ stream_[thread_idx]->Read(&responses_[thread_idx])) {
histogram->Add((Timer::Now() - start) * 1e9);
return true;
}
@@ -128,8 +130,8 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
private:
std::vector<grpc::ClientContext> context_;
- std::vector<std::unique_ptr<grpc::ClientReaderWriter<
- SimpleRequest, SimpleResponse>>> stream_;
+ std::vector<std::unique_ptr<
+ grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 9f7d3b56a4..bf12730f97 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -44,9 +44,11 @@
#include <thread>
#include <deque>
#include <vector>
+#include <unistd.h>
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
using std::list;
using std::thread;
@@ -75,13 +77,10 @@ static deque<string> get_hosts(const string& name) {
}
}
-ScenarioResult RunScenario(const ClientConfig& initial_client_config,
- size_t num_clients,
- const ServerConfig& server_config,
- size_t num_servers,
- int warmup_seconds,
- int benchmark_seconds,
- int spawn_local_worker_count) {
+std::unique_ptr<ScenarioResult> RunScenario(
+ const ClientConfig& initial_client_config, size_t num_clients,
+ const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
+ int benchmark_seconds, int spawn_local_worker_count) {
// ClientContext allocator (all are destroyed at scope exit)
list<ClientContext> contexts;
auto alloc_context = [&contexts]() {
@@ -89,6 +88,11 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
return &contexts.back();
};
+ // To be added to the result, containing the final configuration used for
+ // client and config (incluiding host, etc.)
+ ClientConfig result_client_config;
+ ServerConfig result_server_config;
+
// Get client, server lists
auto workers = get_hosts("QPS_WORKERS");
ClientConfig client_config = initial_client_config;
@@ -96,6 +100,16 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
// Spawn some local workers if desired
vector<unique_ptr<QpsWorker>> local_workers;
for (int i = 0; i < abs(spawn_local_worker_count); i++) {
+ // act as if we're a new test -- gets a good rng seed
+ static bool called_init = false;
+ if (!called_init) {
+ char args_buf[100];
+ strcpy(args_buf, "some-benchmark");
+ char* args[] = {args_buf};
+ grpc_test_init(1, args);
+ called_init = true;
+ }
+
int driver_port = grpc_pick_unused_port_or_die();
int benchmark_port = grpc_pick_unused_port_or_die();
local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
@@ -127,6 +141,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
sd.stub = std::move(Worker::NewStub(
CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
ServerArgs args;
+ result_server_config = server_config;
+ result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
sd.stream = std::move(sd.stub->RunServer(alloc_context()));
GPR_ASSERT(sd.stream->Write(args));
@@ -156,6 +172,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
cd.stub = std::move(Worker::NewStub(CreateChannel(
workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
ClientArgs args;
+ result_client_config = client_config;
+ result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
cd.stream = std::move(cd.stub->RunTest(alloc_context()));
GPR_ASSERT(cd.stream->Write(args));
@@ -193,10 +211,13 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
// Wait some time
gpr_log(GPR_INFO, "Running");
- gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds)));
+ gpr_sleep_until(
+ gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds)));
// Finish a run
- ScenarioResult result;
+ std::unique_ptr<ScenarioResult> result(new ScenarioResult);
+ result->client_config = result_client_config;
+ result->server_config = result_server_config;
gpr_log(GPR_INFO, "Finishing");
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Write(server_mark));
@@ -207,14 +228,14 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
- result.server_resources.push_back(ResourceUsage{
+ result->server_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
}
for (auto client = clients.begin(); client != clients.end(); client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
- result.latencies.MergeProto(stats.latencies());
- result.client_resources.push_back(ResourceUsage{
+ result->latencies.MergeProto(stats.latencies());
+ result->client_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
}
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index eb7119a89d..5e9d4b3cb9 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -34,6 +34,8 @@
#ifndef TEST_QPS_DRIVER_H
#define TEST_QPS_DRIVER_H
+#include <memory>
+
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qpstest.grpc.pb.h"
@@ -49,15 +51,14 @@ struct ScenarioResult {
Histogram latencies;
std::vector<ResourceUsage> client_resources;
std::vector<ResourceUsage> server_resources;
+ ClientConfig client_config;
+ ServerConfig server_config;
};
-ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config,
- size_t num_clients,
- const grpc::testing::ServerConfig& server_config,
- size_t num_servers,
- int warmup_seconds,
- int benchmark_seconds,
- int spawn_local_worker_count);
+std::unique_ptr<ScenarioResult> RunScenario(
+ const grpc::testing::ClientConfig& client_config, size_t num_clients,
+ const grpc::testing::ServerConfig& server_config, size_t num_servers,
+ int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh
index 7bc6eade2c..cb93201933 100755
--- a/test/cpp/qps/qps-sweep.sh
+++ b/test/cpp/qps/qps-sweep.sh
@@ -1,5 +1,34 @@
#!/bin/sh
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
if [ x"$QPS_WORKERS" == x ]; then
echo Error: Must set QPS_WORKERS variable in form \
"host:port,host:port,..." 1>&2
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index fc8e04201c..008830de4e 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -100,17 +100,16 @@ int main(int argc, char** argv) {
// client will deadlock on a timer.
GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
rpc_type == grpc::testing::STREAMING &&
- FLAGS_server_threads < FLAGS_client_channels *
- FLAGS_outstanding_rpcs_per_channel));
+ FLAGS_server_threads <
+ FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel));
- auto result = RunScenario(client_config, FLAGS_num_clients,
- server_config, FLAGS_num_servers,
- FLAGS_warmup_seconds, FLAGS_benchmark_seconds,
- FLAGS_local_workers);
+ const auto result = RunScenario(
+ client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
+ FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers);
- ReportQPSPerCore(result, server_config);
- ReportLatency(result);
- ReportTimes(result);
+ ReportQPSPerCore(*result, server_config);
+ ReportLatency(*result);
+ ReportTimes(*result);
return 0;
}
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index f567e4cf06..9a81d0fc90 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -64,8 +64,8 @@ static void RunQPS() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPSPerCore(result, server_config);
- ReportLatency(result);
+ ReportQPSPerCore(*result, server_config);
+ ReportLatency(*result);
}
} // namespace testing
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 46d70dce52..fb49271991 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -64,17 +64,19 @@ namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT:
- return (config.rpc_type() == RpcType::UNARY) ?
- CreateSynchronousUnaryClient(config) :
- CreateSynchronousStreamingClient(config);
+ return (config.rpc_type() == RpcType::UNARY)
+ ? CreateSynchronousUnaryClient(config)
+ : CreateSynchronousStreamingClient(config);
case ClientType::ASYNC_CLIENT:
- return (config.rpc_type() == RpcType::UNARY) ?
- CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config);
+ return (config.rpc_type() == RpcType::UNARY)
+ ? CreateAsyncUnaryClient(config)
+ : CreateAsyncStreamingClient(config);
}
abort();
}
-std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port) {
+std::unique_ptr<Server> CreateServer(const ServerConfig& config,
+ int server_port) {
switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, server_port);
@@ -86,7 +88,8 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port
class WorkerImpl GRPC_FINAL : public Worker::Service {
public:
- explicit WorkerImpl(int server_port) : server_port_(server_port), acquired_(false) {}
+ explicit WorkerImpl(int server_port)
+ : server_port_(server_port), acquired_(false) {}
Status RunTest(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
@@ -97,7 +100,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
}
grpc_profiler_start("qps_client.prof");
- Status ret = RunTestBody(ctx,stream);
+ Status ret = RunTestBody(ctx, stream);
grpc_profiler_stop();
return ret;
}
@@ -111,7 +114,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
}
grpc_profiler_start("qps_server.prof");
- Status ret = RunServerBody(ctx,stream);
+ Status ret = RunServerBody(ctx, stream);
grpc_profiler_stop();
return ret;
}
@@ -226,8 +229,7 @@ QpsWorker::QpsWorker(int driver_port, int server_port) {
server_ = std::move(builder.BuildAndStart());
}
-QpsWorker::~QpsWorker() {
-}
+QpsWorker::~QpsWorker() {}
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto
index 1553ef5f07..122a7df1ac 100644
--- a/test/cpp/qps/qpstest.proto
+++ b/test/cpp/qps/qpstest.proto
@@ -102,6 +102,7 @@ message ClientConfig {
// only for async client:
optional int32 async_client_threads = 7;
optional RpcType rpc_type = 8 [default=UNARY];
+ optional string host = 9;
}
// Request current stats
@@ -129,6 +130,7 @@ message ServerConfig {
required ServerType server_type = 1;
optional int32 threads = 2 [default=1];
optional bool enable_ssl = 3 [default=false];
+ optional string host = 4;
}
message ServerArgs {
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index 29d88da344..3115ff3bfb 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -48,18 +48,20 @@ void ReportQPS(const ScenarioResult& result) {
}
// QPS: XXX (YYY/server core)
-void ReportQPSPerCore(const ScenarioResult& result, const ServerConfig& server_config) {
- auto qps =
- result.latencies.Count() /
- average(result.client_resources,
- [](ResourceUsage u) { return u.wall_time; });
+void ReportQPSPerCore(const ScenarioResult& result,
+ const ServerConfig& server_config) {
+ auto qps = result.latencies.Count() /
+ average(result.client_resources,
+ [](ResourceUsage u) { return u.wall_time; });
- gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, qps/server_config.threads());
+ gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
+ qps / server_config.threads());
}
// Latency (50/90/95/99/99.9%-ile): AA/BB/CC/DD/EE us
void ReportLatency(const ScenarioResult& result) {
- gpr_log(GPR_INFO, "Latencies (50/90/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f/%.1f us",
+ gpr_log(GPR_INFO,
+ "Latencies (50/90/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f/%.1f us",
result.latencies.Percentile(50) / 1000,
result.latencies.Percentile(90) / 1000,
result.latencies.Percentile(95) / 1000,
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 6cb3192908..b9998405f6 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -64,7 +64,7 @@ namespace testing {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
- char* server_address = NULL;
+ char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
@@ -95,16 +95,19 @@ class AsyncQpsServerTest : public Server {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
- void* got_tag;
+ void *got_tag;
while (srv_cq_->Next(&got_tag, &ok)) {
- ServerRpcContext* ctx = detag(got_tag);
+ ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
- if (ctx->RunNextState(ok) == false) {
+ bool still_going = ctx->RunNextState(ok);
+ std::lock_guard<std::mutex> g(shutdown_mutex_);
+ if (!shutdown_) {
// this RPC context is done, so refresh it
- std::lock_guard<std::mutex> g(shutdown_mutex_);
- if (!shutdown_) {
+ if (!still_going) {
ctx->Reset();
}
+ } else {
+ return;
}
}
return;
@@ -116,11 +119,15 @@ class AsyncQpsServerTest : public Server {
{
std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true;
- srv_cq_->Shutdown();
}
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
+ srv_cq_->Shutdown();
+ bool ok;
+ void *got_tag;
+ while (srv_cq_->Next(&got_tag, &ok))
+ ;
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
@@ -133,23 +140,23 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext() {}
virtual ~ServerRpcContext(){};
virtual bool RunNextState(bool) = 0; // next state, return false if done
- virtual void Reset() = 0; // start this back at a clean state
+ virtual void Reset() = 0; // start this back at a clean state
};
- static void* tag(ServerRpcContext* func) {
- return reinterpret_cast<void*>(func);
+ static void *tag(ServerRpcContext *func) {
+ return reinterpret_cast<void *>(func);
}
- static ServerRpcContext* detag(void* tag) {
- return reinterpret_cast<ServerRpcContext*>(tag);
+ static ServerRpcContext *detag(void *tag) {
+ return reinterpret_cast<ServerRpcContext *>(tag);
}
template <class RequestType, class ResponseType>
class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext {
public:
ServerRpcContextUnaryImpl(
- std::function<void(ServerContext*, RequestType*,
- grpc::ServerAsyncResponseWriter<ResponseType>*,
- void*)> request_method,
- std::function<grpc::Status(const RequestType*, ResponseType*)>
+ std::function<void(ServerContext *, RequestType *,
+ grpc::ServerAsyncResponseWriter<ResponseType> *,
+ void *)> request_method,
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: next_state_(&ServerRpcContextUnaryImpl::invoker),
request_method_(request_method),
@@ -159,7 +166,9 @@ class AsyncQpsServerTest : public Server {
AsyncQpsServerTest::tag(this));
}
~ServerRpcContextUnaryImpl() GRPC_OVERRIDE {}
- bool RunNextState(bool ok) GRPC_OVERRIDE {return (this->*next_state_)(ok);}
+ bool RunNextState(bool ok) GRPC_OVERRIDE {
+ return (this->*next_state_)(ok);
+ }
void Reset() GRPC_OVERRIDE {
srv_ctx_ = ServerContext();
req_ = RequestType();
@@ -192,10 +201,10 @@ class AsyncQpsServerTest : public Server {
ServerContext srv_ctx_;
RequestType req_;
bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
- std::function<void(ServerContext*, RequestType*,
- grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
+ std::function<void(ServerContext *, RequestType *,
+ grpc::ServerAsyncResponseWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType*, ResponseType*)>
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method_;
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
@@ -204,9 +213,9 @@ class AsyncQpsServerTest : public Server {
class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext {
public:
ServerRpcContextStreamingImpl(
- std::function<void(ServerContext *,
- grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType> *, void *)> request_method,
+ std::function<void(ServerContext *, grpc::ServerAsyncReaderWriter<
+ ResponseType, RequestType> *,
+ void *)> request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: next_state_(&ServerRpcContextStreamingImpl::request_done),
@@ -215,14 +224,15 @@ class AsyncQpsServerTest : public Server {
stream_(&srv_ctx_) {
request_method_(&srv_ctx_, &stream_, AsyncQpsServerTest::tag(this));
}
- ~ServerRpcContextStreamingImpl() GRPC_OVERRIDE {
+ ~ServerRpcContextStreamingImpl() GRPC_OVERRIDE {}
+ bool RunNextState(bool ok) GRPC_OVERRIDE {
+ return (this->*next_state_)(ok);
}
- bool RunNextState(bool ok) GRPC_OVERRIDE {return (this->*next_state_)(ok);}
void Reset() GRPC_OVERRIDE {
srv_ctx_ = ServerContext();
req_ = RequestType();
- stream_ = grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType>(&srv_ctx_);
+ stream_ =
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(&srv_ctx_);
// Then request the method
next_state_ = &ServerRpcContextStreamingImpl::request_done;
@@ -241,47 +251,47 @@ class AsyncQpsServerTest : public Server {
bool read_done(bool ok) {
if (ok) {
- // invoke the method
- ResponseType response;
- // Call the RPC processing function
- grpc::Status status = invoke_method_(&req_, &response);
- // initiate the write
- stream_.Write(response, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::write_done;
- } else { // client has sent writes done
- // finish the stream
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ // invoke the method
+ ResponseType response;
+ // Call the RPC processing function
+ grpc::Status status = invoke_method_(&req_, &response);
+ // initiate the write
+ stream_.Write(response, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::write_done;
+ } else { // client has sent writes done
+ // finish the stream
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::finish_done;
}
return true;
}
bool write_done(bool ok) {
// now go back and get another streaming read!
if (ok) {
- stream_.Read(&req_, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::read_done;
- }
- else {
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::read_done;
+ } else {
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::finish_done;
}
return true;
}
- bool finish_done(bool ok) {return false; /* reset the context */ }
+ bool finish_done(bool ok) { return false; /* reset the context */ }
ServerContext srv_ctx_;
RequestType req_;
bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
- std::function<void(ServerContext *,
- grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType> *, void *)> request_method_;
+ std::function<void(
+ ServerContext *,
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
+ request_method_;
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method_;
- grpc::ServerAsyncReaderWriter<ResponseType,RequestType> stream_;
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
};
- static Status ProcessRPC(const SimpleRequest* request,
- SimpleResponse* response) {
+ static Status ProcessRPC(const SimpleRequest *request,
+ SimpleResponse *response) {
if (request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
@@ -294,19 +304,20 @@ class AsyncQpsServerTest : public Server {
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
TestService::AsyncService async_service_;
- std::function<void(ServerContext*, SimpleRequest*,
- grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)>
+ std::function<void(ServerContext *, SimpleRequest *,
+ grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
- std::function<void(ServerContext*, grpc::ServerAsyncReaderWriter<
- SimpleResponse,SimpleRequest>*, void*)>
+ std::function<void(
+ ServerContext *,
+ grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
request_streaming_;
- std::forward_list<ServerRpcContext*> contexts_;
+ std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_;
bool shutdown_;
};
-std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config,
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index 2770233a7c..bc00de9ced 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -70,18 +70,18 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
}
return Status::OK;
}
- Status StreamingCall(ServerContext *context,
- ServerReaderWriter<SimpleResponse, SimpleRequest>*
- stream) GRPC_OVERRIDE {
+ Status StreamingCall(
+ ServerContext* context,
+ ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) GRPC_OVERRIDE {
SimpleRequest request;
while (stream->Read(&request)) {
SimpleResponse response;
if (request.response_size() > 0) {
- if (!Server::SetPayload(request.response_type(),
- request.response_size(),
- response.mutable_payload())) {
- return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
- }
+ if (!Server::SetPayload(request.response_type(),
+ request.response_size(),
+ response.mutable_payload())) {
+ return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
+ }
}
stream->Write(response);
}
diff --git a/test/cpp/qps/single_run_localhost.sh b/test/cpp/qps/single_run_localhost.sh
index 2f60b4e49d..9d76f08f80 100755
--- a/test/cpp/qps/single_run_localhost.sh
+++ b/test/cpp/qps/single_run_localhost.sh
@@ -1,4 +1,32 @@
#!/bin/sh
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# performs a single qps run with one client and one server
diff --git a/test/cpp/qps/sync_streaming_ping_pong_test.cc b/test/cpp/qps/sync_streaming_ping_pong_test.cc
index 48c7ff63e0..218306846b 100644
--- a/test/cpp/qps/sync_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/sync_streaming_ping_pong_test.cc
@@ -63,8 +63,8 @@ static void RunSynchronousStreamingPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPS(result);
- ReportLatency(result);
+ ReportQPS(*result);
+ ReportLatency(*result);
}
} // namespace testing
diff --git a/test/cpp/qps/sync_unary_ping_pong_test.cc b/test/cpp/qps/sync_unary_ping_pong_test.cc
index 4c4de6377b..137ef79f2f 100644
--- a/test/cpp/qps/sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/sync_unary_ping_pong_test.cc
@@ -63,8 +63,8 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPS(result);
- ReportLatency(result);
+ ReportQPS(*result);
+ ReportLatency(*result);
}
} // namespace testing
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index 281c617382..dfc102fc17 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -37,6 +37,7 @@
#include <thread>
#include <grpc/grpc.h>
+#include <grpc/support/time.h>
#include <gflags/gflags.h>
#include "qps_worker.h"
@@ -47,7 +48,7 @@ DEFINE_int32(server_port, 0, "Spawned server port.");
static bool got_sigint = false;
-static void sigint_handler(int x) {got_sigint = true;}
+static void sigint_handler(int x) { got_sigint = true; }
namespace grpc {
namespace testing {
@@ -56,7 +57,7 @@ static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
while (!got_sigint) {
- std::this_thread::sleep_for(std::chrono::seconds(5));
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(5)));
}
}
@@ -69,6 +70,6 @@ int main(int argc, char** argv) {
signal(SIGINT, sigint_handler);
grpc::testing::RunServer();
-
+
return 0;
}
diff --git a/test/cpp/util/create_test_channel.cc b/test/cpp/util/create_test_channel.cc
index f040acc4b1..dc48fa2d87 100644
--- a/test/cpp/util/create_test_channel.cc
+++ b/test/cpp/util/create_test_channel.cc
@@ -58,13 +58,13 @@ namespace grpc {
std::shared_ptr<ChannelInterface> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
- const std::unique_ptr<Credentials>& creds) {
+ const std::shared_ptr<Credentials>& creds) {
ChannelArguments channel_args;
if (enable_ssl) {
const char* roots_certs = use_prod_roots ? "" : test_root_cert;
SslCredentialsOptions ssl_opts = {roots_certs, "", ""};
- std::unique_ptr<Credentials> channel_creds = SslCredentials(ssl_opts);
+ std::shared_ptr<Credentials> channel_creds = SslCredentials(ssl_opts);
if (!server.empty() && !override_hostname.empty()) {
channel_args.SetSslTargetNameOverride(override_hostname);
@@ -84,7 +84,7 @@ std::shared_ptr<ChannelInterface> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots) {
return CreateTestChannel(server, override_hostname, enable_ssl,
- use_prod_roots, std::unique_ptr<Credentials>());
+ use_prod_roots, std::shared_ptr<Credentials>());
}
// Shortcut for end2end and interop tests.
diff --git a/test/cpp/util/create_test_channel.h b/test/cpp/util/create_test_channel.h
index 5c298ce850..5f2609ddd8 100644
--- a/test/cpp/util/create_test_channel.h
+++ b/test/cpp/util/create_test_channel.h
@@ -52,7 +52,7 @@ std::shared_ptr<ChannelInterface> CreateTestChannel(
std::shared_ptr<ChannelInterface> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
- const std::unique_ptr<Credentials>& creds);
+ const std::shared_ptr<Credentials>& creds);
} // namespace grpc
diff --git a/test/cpp/util/fake_credentials.cc b/test/cpp/util/fake_credentials.cc
new file mode 100644
index 0000000000..f5b83b8159
--- /dev/null
+++ b/test/cpp/util/fake_credentials.cc
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc_security.h>
+#include <grpc++/channel_arguments.h>
+#include <grpc++/credentials.h>
+#include <grpc++/server_credentials.h>
+#include "src/cpp/client/channel.h"
+#include "src/cpp/client/secure_credentials.h"
+#include "src/cpp/server/secure_server_credentials.h"
+
+namespace grpc {
+namespace testing {
+
+std::shared_ptr<Credentials> FakeTransportSecurityCredentials() {
+ grpc_credentials* c_creds = grpc_fake_transport_security_credentials_create();
+ return std::shared_ptr<Credentials>(new SecureCredentials(c_creds));
+}
+
+std::shared_ptr<ServerCredentials> FakeTransportSecurityServerCredentials() {
+ grpc_server_credentials* c_creds =
+ grpc_fake_transport_security_server_credentials_create();
+ return std::shared_ptr<ServerCredentials>(
+ new SecureServerCredentials(c_creds));
+}
+
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/util/fake_credentials.h b/test/cpp/util/fake_credentials.h
new file mode 100644
index 0000000000..e1ba7bb9e4
--- /dev/null
+++ b/test/cpp/util/fake_credentials.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_TEST_CPP_UTIL_FAKE_CREDENTIALS_H
+#define GRPC_TEST_CPP_UTIL_FAKE_CREDENTIALS_H
+
+#include <memory>
+
+namespace grpc {
+class Credentials;
+class ServerCredentials;
+
+namespace testing {
+
+std::shared_ptr<Credentials> FakeTransportSecurityCredentials();
+std::shared_ptr<ServerCredentials> FakeTransportSecurityServerCredentials();
+
+} // namespace testing
+} // namespace grpc
+
+#endif // GRPC_TEST_CPP_UTIL_FAKE_CREDENTIALS_H
diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc
index d71a7a0b77..ad3c0af877 100644
--- a/test/cpp/util/grpc_cli.cc
+++ b/test/cpp/util/grpc_cli.cc
@@ -104,7 +104,7 @@ int main(int argc, char** argv) {
std::stringstream input_stream;
input_stream << input_file.rdbuf();
- std::unique_ptr<grpc::Credentials> creds;
+ std::shared_ptr<grpc::Credentials> creds;
if (!FLAGS_enable_ssl) {
creds = grpc::InsecureCredentials();
} else {
diff --git a/test/cpp/util/messages.proto b/test/cpp/util/messages.proto
index a79bce1f30..062f66c091 100644
--- a/test/cpp/util/messages.proto
+++ b/test/cpp/util/messages.proto
@@ -36,6 +36,7 @@ message RequestParams {
optional bool echo_deadline = 1;
optional int32 client_cancel_after_us = 2;
optional int32 server_cancel_after_us = 3;
+ optional bool echo_metadata = 4;
}
message EchoRequest {
diff --git a/test/cpp/util/subprocess.cc b/test/cpp/util/subprocess.cc
new file mode 100644
index 0000000000..d758f629ac
--- /dev/null
+++ b/test/cpp/util/subprocess.cc
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/cpp/util/subprocess.h"
+
+#include <vector>
+
+#include <grpc/support/subprocess.h>
+
+namespace grpc {
+
+static gpr_subprocess *MakeProcess(std::initializer_list<std::string> args) {
+ std::vector<const char *> vargs;
+ for (auto it = args.begin(); it != args.end(); ++it) {
+ vargs.push_back(it->c_str());
+ }
+ return gpr_subprocess_create(vargs.size(), &vargs[0]);
+}
+
+SubProcess::SubProcess(std::initializer_list<std::string> args)
+ : subprocess_(MakeProcess(args)) {}
+
+SubProcess::~SubProcess() { gpr_subprocess_destroy(subprocess_); }
+
+int SubProcess::Join() { return gpr_subprocess_join(subprocess_); }
+
+void SubProcess::Interrupt() { gpr_subprocess_interrupt(subprocess_); }
+
+} // namespace grpc \ No newline at end of file
diff --git a/test/cpp/util/subprocess.h b/test/cpp/util/subprocess.h
new file mode 100644
index 0000000000..8fafe3d177
--- /dev/null
+++ b/test/cpp/util/subprocess.h
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_TEST_CPP_UTIL_SUBPROCESS_H
+#define GRPC_TEST_CPP_UTIL_SUBPROCESS_H
+
+#include <initializer_list>
+#include <string>
+
+struct gpr_subprocess;
+
+namespace grpc {
+
+class SubProcess {
+ public:
+ SubProcess(std::initializer_list<std::string> args);
+ ~SubProcess();
+
+ int Join();
+ void Interrupt();
+
+ private:
+ SubProcess(const SubProcess& other);
+ SubProcess& operator=(const SubProcess& other);
+
+ gpr_subprocess* const subprocess_;
+};
+
+} // namespace grpc
+
+#endif // GRPC_TEST_CPP_UTIL_SUBPROCESS_H