aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/common/alarm_cpp_test.cc (renamed from test/cpp/common/alarm_test.cc)29
-rw-r--r--test/cpp/end2end/hybrid_end2end_test.cc2
-rw-r--r--test/cpp/end2end/thread_stress_test.cc143
-rw-r--r--test/cpp/qps/driver.cc13
-rw-r--r--test/cpp/qps/driver.h3
-rw-r--r--test/cpp/qps/qps_driver.cc7
-rw-r--r--test/cpp/qps/qps_worker.cc24
-rw-r--r--test/cpp/qps/qps_worker.h7
-rw-r--r--test/cpp/qps/worker.cc2
-rw-r--r--test/cpp/util/time_test.cc4
10 files changed, 191 insertions, 43 deletions
diff --git a/test/cpp/common/alarm_test.cc b/test/cpp/common/alarm_cpp_test.cc
index 09df6852a5..4745ef14ec 100644
--- a/test/cpp/common/alarm_test.cc
+++ b/test/cpp/common/alarm_cpp_test.cc
@@ -35,58 +35,47 @@
#include <grpc++/completion_queue.h>
#include <gtest/gtest.h>
-#include <grpc++/completion_queue.h>
#include "test/core/util/test_config.h"
namespace grpc {
namespace {
-class TestTag : public CompletionQueueTag {
- public:
- TestTag() : tag_(0) {}
- TestTag(intptr_t tag) : tag_(tag) {}
- bool FinalizeResult(void** tag, bool* status) { return true; }
- intptr_t tag() { return tag_; }
-
- private:
- intptr_t tag_;
-};
-
TEST(AlarmTest, RegularExpiry) {
CompletionQueue cq;
- TestTag input_tag(1618033);
- Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), &input_tag);
+ void* junk = reinterpret_cast<void*>(1618033);
+ Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), junk);
- TestTag* output_tag;
+ void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
(void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
- EXPECT_EQ(output_tag->tag(), input_tag.tag());
+ EXPECT_EQ(junk, output_tag);
}
TEST(AlarmTest, Cancellation) {
CompletionQueue cq;
- TestTag input_tag(1618033);
- Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), &input_tag);
+ void* junk = reinterpret_cast<void*>(1618033);
+ Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), junk);
alarm.Cancel();
- TestTag* output_tag;
+ void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
(void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_FALSE(ok);
- EXPECT_EQ(output_tag->tag(), input_tag.tag());
+ EXPECT_EQ(junk, output_tag);
}
} // namespace
} // namespace grpc
int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc
index f8405627f9..c72e20628f 100644
--- a/test/cpp/end2end/hybrid_end2end_test.cc
+++ b/test/cpp/end2end/hybrid_end2end_test.cc
@@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test {
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
- cqs_.push_back(std::move(builder.AddCompletionQueue()));
+ cqs_.push_back(builder.AddCompletionQueue());
}
server_ = builder.BuildAndStart();
}
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index 4e8860e843..e246c0b0e2 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -45,6 +45,7 @@
#include <grpc/support/time.h>
#include <gtest/gtest.h>
+#include "src/core/surface/api_trace.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@@ -54,6 +55,11 @@ using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using std::chrono::system_clock;
+const int kNumThreads = 100; // Number of threads
+const int kNumAsyncSendThreads = 2;
+const int kNumAsyncReceiveThreads = 50;
+const int kNumRpcs = 1000; // Number of RPCs per thread
+
namespace grpc {
namespace testing {
@@ -84,7 +90,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
MaybeEchoDeadline(context, request, response);
if (request->has_param() && request->param().client_cancel_after_us()) {
{
- std::unique_lock<std::mutex> lock(mu_);
+ unique_lock<mutex> lock(mu_);
signal_client_ = true;
}
while (!context->IsCancelled()) {
@@ -149,13 +155,13 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
}
bool signal_client() {
- std::unique_lock<std::mutex> lock(mu_);
+ unique_lock<mutex> lock(mu_);
return signal_client_;
}
private:
bool signal_client_;
- std::mutex mu_;
+ mutex mu_;
};
class TestServiceImplDupPkg
@@ -168,11 +174,10 @@ class TestServiceImplDupPkg
}
};
-class End2endTest : public ::testing::Test {
- protected:
- End2endTest() : kMaxMessageSize_(8192) {}
-
- void SetUp() GRPC_OVERRIDE {
+class CommonStressTest {
+ public:
+ CommonStressTest() : kMaxMessageSize_(8192) {}
+ void SetUp() {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
@@ -185,15 +190,15 @@ class End2endTest : public ::testing::Test {
builder.RegisterService(&dup_pkg_service_);
server_ = builder.BuildAndStart();
}
-
- void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
-
+ void TearDown() { server_->Shutdown(); }
void ResetStub() {
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
+ private:
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
@@ -202,6 +207,16 @@ class End2endTest : public ::testing::Test {
TestServiceImplDupPkg dup_pkg_service_;
};
+class End2endTest : public ::testing::Test {
+ protected:
+ End2endTest() {}
+ void SetUp() GRPC_OVERRIDE { common_.SetUp(); }
+ void TearDown() GRPC_OVERRIDE { common_.TearDown(); }
+ void ResetStub() { common_.ResetStub(); }
+
+ CommonStressTest common_;
+};
+
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
EchoRequest request;
EchoResponse response;
@@ -216,17 +231,115 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
}
TEST_F(End2endTest, ThreadStress) {
- ResetStub();
+ common_.ResetStub();
std::vector<std::thread*> threads;
- for (int i = 0; i < 100; ++i) {
- threads.push_back(new std::thread(SendRpc, stub_.get(), 1000));
+ for (int i = 0; i < kNumThreads; ++i) {
+ threads.push_back(new std::thread(SendRpc, common_.GetStub(), kNumRpcs));
}
- for (int i = 0; i < 100; ++i) {
+ for (int i = 0; i < kNumThreads; ++i) {
threads[i]->join();
delete threads[i];
}
}
+class AsyncClientEnd2endTest : public ::testing::Test {
+ protected:
+ AsyncClientEnd2endTest() : rpcs_outstanding_(0) {}
+
+ void SetUp() GRPC_OVERRIDE { common_.SetUp(); }
+ void TearDown() GRPC_OVERRIDE {
+ void* ignored_tag;
+ bool ignored_ok;
+ while (cq_.Next(&ignored_tag, &ignored_ok))
+ ;
+ common_.TearDown();
+ }
+
+ void Wait() {
+ unique_lock<mutex> l(mu_);
+ while (rpcs_outstanding_ != 0) {
+ cv_.wait(l);
+ }
+
+ cq_.Shutdown();
+ }
+
+ struct AsyncClientCall {
+ EchoResponse response;
+ ClientContext context;
+ Status status;
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
+ };
+
+ void AsyncSendRpc(int num_rpcs) {
+ for (int i = 0; i < num_rpcs; ++i) {
+ AsyncClientCall* call = new AsyncClientCall;
+ EchoRequest request;
+ request.set_message("Hello: " + std::to_string(i));
+ call->response_reader =
+ common_.GetStub()->AsyncEcho(&call->context, request, &cq_);
+ call->response_reader->Finish(&call->response, &call->status,
+ (void*)call);
+
+ unique_lock<mutex> l(mu_);
+ rpcs_outstanding_++;
+ }
+ }
+
+ void AsyncCompleteRpc() {
+ while (true) {
+ void* got_tag;
+ bool ok = false;
+ if (!cq_.Next(&got_tag, &ok)) break;
+ AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
+ if (!ok) {
+ gpr_log(GPR_DEBUG, "Error: %d", call->status.error_code());
+ }
+ delete call;
+
+ bool notify;
+ {
+ unique_lock<mutex> l(mu_);
+ rpcs_outstanding_--;
+ notify = (rpcs_outstanding_ == 0);
+ }
+ if (notify) {
+ cv_.notify_all();
+ }
+ }
+ }
+
+ CommonStressTest common_;
+ CompletionQueue cq_;
+ mutex mu_;
+ condition_variable cv_;
+ int rpcs_outstanding_;
+};
+
+TEST_F(AsyncClientEnd2endTest, ThreadStress) {
+ common_.ResetStub();
+ std::vector<std::thread*> send_threads, completion_threads;
+ for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
+ completion_threads.push_back(new std::thread(
+ &AsyncClientEnd2endTest_ThreadStress_Test::AsyncCompleteRpc, this));
+ }
+ for (int i = 0; i < kNumAsyncSendThreads; ++i) {
+ send_threads.push_back(
+ new std::thread(&AsyncClientEnd2endTest_ThreadStress_Test::AsyncSendRpc,
+ this, kNumRpcs));
+ }
+ for (int i = 0; i < kNumAsyncSendThreads; ++i) {
+ send_threads[i]->join();
+ delete send_threads[i];
+ }
+
+ Wait();
+ for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
+ completion_threads[i]->join();
+ delete completion_threads[i];
+ }
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index c70b0303b8..80f6ada409 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -384,5 +384,18 @@ std::unique_ptr<ScenarioResult> RunScenario(
delete[] servers;
return result;
}
+
+void RunQuit() {
+ // Get client, server lists
+ auto workers = get_workers("QPS_WORKERS");
+ for (size_t i = 0; i < workers.size(); i++) {
+ auto stub = WorkerService::NewStub(
+ CreateChannel(workers[i], InsecureChannelCredentials()));
+ Void dummy;
+ grpc::ClientContext ctx;
+ GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
+ }
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 2a7cf805e5..3af61f7391 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -70,6 +70,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
+void RunQuit();
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index ffc8a83fc5..69fb4d75e8 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -78,6 +78,8 @@ DEFINE_int32(client_core_limit, -1, "Limit on client cores to use");
DEFINE_bool(secure_test, false, "Run a secure test");
+DEFINE_bool(quit, false, "Quit the workers");
+
using grpc::testing::ClientConfig;
using grpc::testing::ServerConfig;
using grpc::testing::ClientType;
@@ -90,6 +92,11 @@ namespace grpc {
namespace testing {
static void QpsDriver() {
+ if (FLAGS_quit) {
+ RunQuit();
+ return;
+ }
+
RpcType rpc_type;
GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type));
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 7e9e05f7ec..9442017ddf 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -103,8 +103,8 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
public:
- explicit WorkerServiceImpl(int server_port)
- : acquired_(false), server_port_(server_port) {}
+ WorkerServiceImpl(int server_port, QpsWorker* worker)
+ : acquired_(false), server_port_(server_port), worker_(worker) {}
Status RunClient(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
@@ -140,6 +140,16 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
return Status::OK;
}
+ Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
+ InstanceGuard g(this);
+ if (!g.Acquired()) {
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ }
+
+ worker_->MarkDone();
+ return Status::OK;
+ }
+
private:
// Protect against multiple clients using this worker at once.
class InstanceGuard {
@@ -250,10 +260,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
std::mutex mu_;
bool acquired_;
int server_port_;
+ QpsWorker* worker_;
};
QpsWorker::QpsWorker(int driver_port, int server_port) {
- impl_.reset(new WorkerServiceImpl(server_port));
+ impl_.reset(new WorkerServiceImpl(server_port, this));
+ gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", driver_port);
@@ -269,5 +281,11 @@ QpsWorker::QpsWorker(int driver_port, int server_port) {
QpsWorker::~QpsWorker() {}
+bool QpsWorker::Done() const {
+ return (gpr_atm_acq_load(&done_) != static_cast<gpr_atm>(0));
+}
+void QpsWorker::MarkDone() {
+ gpr_atm_rel_store(&done_, static_cast<gpr_atm>(1));
+}
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h
index 27de69fa65..624c182100 100644
--- a/test/cpp/qps/qps_worker.h
+++ b/test/cpp/qps/qps_worker.h
@@ -36,6 +36,8 @@
#include <memory>
+#include <grpc/support/atm.h>
+
namespace grpc {
class Server;
@@ -49,9 +51,14 @@ class QpsWorker {
explicit QpsWorker(int driver_port, int server_port = 0);
~QpsWorker();
+ bool Done() const;
+ void MarkDone();
+
private:
std::unique_ptr<WorkerServiceImpl> impl_;
std::unique_ptr<Server> server_;
+
+ gpr_atm done_;
};
} // namespace testing
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index a1e73e9abe..f42cfe3255 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -56,7 +56,7 @@ namespace testing {
static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
- while (!got_sigint) {
+ while (!got_sigint && !worker.Done()) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(5, GPR_TIMESPAN)));
}
diff --git a/test/cpp/util/time_test.cc b/test/cpp/util/time_test.cc
index 1e501dfd28..48c6ce7697 100644
--- a/test/cpp/util/time_test.cc
+++ b/test/cpp/util/time_test.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -45,7 +45,7 @@ namespace {
class TimeTest : public ::testing::Test {};
TEST_F(TimeTest, AbsolutePointTest) {
- long us = 10000000L;
+ int64_t us = 10000000L;
gpr_timespec ts = gpr_time_from_micros(us, GPR_TIMESPAN);
ts.clock_type = GPR_CLOCK_REALTIME;
system_clock::time_point tp{microseconds(us)};