aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-03-14 08:43:13 -0700
committerGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-03-14 08:43:13 -0700
commit231233271349a8c57cf306bcc8329783da38e512 (patch)
tree733284eeec13b4bbf56eba6ab95c094829fe2b60 /test
parente1e459e5fe5a09a8a744d94d5f3e0cda1c2631b3 (diff)
parent2697f2141f8534a80bb4db0d253e3657dfe3eddc (diff)
Merge pull request #5444 from vjpai/debug_qps_stream
Make performance benchmarking code more canonical in structure
Diffstat (limited to 'test')
-rw-r--r--test/cpp/qps/client.h35
-rw-r--r--test/cpp/qps/driver.cc32
2 files changed, 27 insertions, 40 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 2dc83f0f29..92e77eed9b 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -123,15 +123,13 @@ class Client {
if (reset) {
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->BeginSwap(&to_merge[i]);
- }
- std::unique_ptr<UsageTimer> timer(new UsageTimer);
- timer_.swap(timer);
- for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->EndSwap();
+ threads_[i]->Swap(&to_merge[i]);
latencies.Merge(to_merge[i]);
}
delete[] to_merge;
+
+ std::unique_ptr<UsageTimer> timer(new UsageTimer);
+ timer_.swap(timer);
timer_result = timer->Mark();
} else {
// merge snapshots of each thread histogram
@@ -227,7 +225,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
- new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@@ -240,16 +237,9 @@ class Client {
impl_.join();
}
- void BeginSwap(Histogram* n) {
+ void Swap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
- new_stats_ = n;
- }
-
- void EndSwap() {
- std::unique_lock<std::mutex> g(mu_);
- while (new_stats_ != nullptr) {
- cv_.wait(g);
- };
+ n->Swap(&histogram_);
}
void MergeStatsInto(Histogram* hist) {
@@ -263,10 +253,11 @@ class Client {
void ThreadFunc() {
for (;;) {
+ // lock since the thread should only be doing one thing at a time
+ std::lock_guard<std::mutex> g(mu_);
// run the loop body
const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
- // lock, see if we're done
- std::lock_guard<std::mutex> g(mu_);
+ // see if we're done
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
@@ -274,19 +265,11 @@ class Client {
if (done_) {
return;
}
- // check if we're resetting stats, swap out the histogram if so
- if (new_stats_) {
- new_stats_->Swap(&histogram_);
- new_stats_ = nullptr;
- cv_.notify_one();
- }
}
}
std::mutex mu_;
- std::condition_variable cv_;
bool done_;
- Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 1c7fdf8796..bc8780f74d 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -348,19 +348,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
std::unique_ptr<ScenarioResult> result(new ScenarioResult);
result->client_config = result_client_config;
result->server_config = result_server_config;
- gpr_log(GPR_INFO, "Finishing");
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
- }
+ gpr_log(GPR_INFO, "Finishing clients");
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
- }
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
- const auto& stats = server_status.stats();
- result->server_resources.emplace_back(
- stats.time_elapsed(), stats.time_user(), stats.time_system(),
- server_status.cores());
+ GPR_ASSERT(client->stream->WritesDone());
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
@@ -368,17 +359,30 @@ std::unique_ptr<ScenarioResult> RunScenario(
result->latencies.MergeProto(stats.latencies());
result->client_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system(), -1);
+ GPR_ASSERT(!client->stream->Read(&client_status));
}
-
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->WritesDone());
GPR_ASSERT(client->stream->Finish().ok());
}
+ delete[] clients;
+
+ gpr_log(GPR_INFO, "Finishing servers");
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->Write(server_mark));
GPR_ASSERT(server->stream->WritesDone());
+ }
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->Read(&server_status));
+ const auto& stats = server_status.stats();
+ result->server_resources.emplace_back(
+ stats.time_elapsed(), stats.time_user(), stats.time_system(),
+ server_status.cores());
+ GPR_ASSERT(!server->stream->Read(&server_status));
+ }
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Finish().ok());
}
- delete[] clients;
+
delete[] servers;
return result;
}