aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-07-15 09:16:05 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-07-15 09:16:05 -0700
commit30a4a544be649fd06df84a386f430e5b102de4f7 (patch)
tree5471aad5ed69edc4cf78f1944dd37088636b7fa5 /test/cpp
parent14bed44db2040c3a1e38935b372146ccbd81dddc (diff)
parent51d002244816d53623d1743df15fe26e94f9e8b6 (diff)
Merge branch 'filter_call_init_failure' into filter_api
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc25
-rw-r--r--test/cpp/end2end/end2end_test.cc3
-rw-r--r--test/cpp/qps/client.h68
-rw-r--r--test/cpp/qps/client_async.cc103
-rw-r--r--test/cpp/qps/client_sync.cc28
-rw-r--r--test/cpp/qps/driver.cc135
-rw-r--r--test/cpp/qps/driver.h2
-rwxr-xr-xtest/cpp/qps/gen_build_yaml.py11
-rw-r--r--test/cpp/qps/json_run_localhost.cc2
-rw-r--r--test/cpp/qps/qps_json_driver.cc18
-rw-r--r--test/cpp/qps/qps_worker.cc40
-rw-r--r--test/cpp/qps/server_async.cc57
-rw-r--r--test/cpp/util/slice_test.cc10
13 files changed, 340 insertions, 162 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 6c7eae53a4..4a8936d281 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -345,6 +345,31 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}
+// We do not need to protect notify because the use is synchronized.
+void ServerWait(Server* server, int* notify) {
+ server->Wait();
+ *notify = 1;
+}
+TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
+ int notify = 0;
+ std::thread* wait_thread =
+ new std::thread(&ServerWait, server_.get(), &notify);
+ ResetStub();
+ SendRpc(1);
+ EXPECT_EQ(0, notify);
+ server_->Shutdown();
+ wait_thread->join();
+ EXPECT_EQ(1, notify);
+ delete wait_thread;
+}
+
+TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
+ ResetStub();
+ SendRpc(1);
+ server_->Shutdown();
+ server_->Wait();
+}
+
// Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ResetStub();
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 354a59cedd..0f87ae3e44 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1166,6 +1166,9 @@ TEST_P(ProxyEnd2endTest, HugeResponse) {
request.mutable_param()->set_response_message_length(kResponseSize);
ClientContext context;
+ std::chrono::system_clock::time_point deadline =
+ std::chrono::system_clock::now() + std::chrono::seconds(20);
+ context.set_deadline(deadline);
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(kResponseSize, response.message().size());
EXPECT_TRUE(s.ok());
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 047bd16408..4045e13460 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -112,6 +112,21 @@ class ClientRequestCreator<ByteBuffer> {
}
};
+class HistogramEntry GRPC_FINAL {
+ public:
+ HistogramEntry() : used_(false) {}
+ bool used() const { return used_; }
+ double value() const { return value_; }
+ void set_value(double v) {
+ used_ = true;
+ value_ = v;
+ }
+
+ private:
+ bool used_;
+ double value_;
+};
+
class Client {
public:
Client() : timer_(new UsageTimer), interarrival_timer_() {}
@@ -151,10 +166,21 @@ class Client {
return stats;
}
+ // Must call AwaitThreadsCompletion before destructor to avoid a race
+ // between destructor and invocation of virtual ThreadFunc
+ void AwaitThreadsCompletion() {
+ DestroyMultithreading();
+ std::unique_lock<std::mutex> g(thread_completion_mu_);
+ while (threads_remaining_ != 0) {
+ threads_complete_.wait(g);
+ }
+ }
+
protected:
bool closed_loop_;
void StartThreads(size_t num_threads) {
+ threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
@@ -162,7 +188,8 @@ class Client {
void EndThreads() { threads_.clear(); }
- virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+ virtual void DestroyMultithreading() = 0;
+ virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@@ -215,7 +242,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
- new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@@ -230,15 +256,10 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
- new_stats_ = n;
+ n->Swap(&histogram_);
}
- void EndSwap() {
- std::unique_lock<std::mutex> g(mu_);
- while (new_stats_ != nullptr) {
- cv_.wait(g);
- };
- }
+ void EndSwap() {}
void MergeStatsInto(Histogram* hist) {
std::unique_lock<std::mutex> g(mu_);
@@ -252,29 +273,26 @@ class Client {
void ThreadFunc() {
for (;;) {
// run the loop body
- const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
- // lock, see if we're done
+ HistogramEntry entry;
+ const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
+ // lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
+ if (entry.used()) {
+ histogram_.Add(entry.value());
+ }
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
+ client_->CompleteThread();
return;
}
- // check if we're resetting stats, swap out the histogram if so
- if (new_stats_) {
- new_stats_->Swap(&histogram_);
- new_stats_ = nullptr;
- cv_.notify_one();
- }
}
}
std::mutex mu_;
- std::condition_variable cv_;
bool done_;
- Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
@@ -286,6 +304,18 @@ class Client {
InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_;
+
+ std::mutex thread_completion_mu_;
+ size_t threads_remaining_;
+ std::condition_variable threads_complete_;
+
+ void CompleteThread() {
+ std::lock_guard<std::mutex> g(thread_completion_mu_);
+ threads_remaining_--;
+ if (threads_remaining_ == 0) {
+ threads_complete_.notify_all();
+ }
+ }
};
template <class StubType, class RequestType>
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 1507d1e3d6..5d9cb4bd0c 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -31,7 +31,6 @@
*
*/
-#include <cassert>
#include <forward_list>
#include <functional>
#include <list>
@@ -48,7 +47,6 @@
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
#include <grpc/support/cpu.h>
-#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include "src/proto/grpc/testing/services.grpc.pb.h"
@@ -64,7 +62,7 @@ class ClientRpcContext {
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
- virtual bool RunNextState(bool, Histogram* hist) = 0;
+ virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
virtual ClientRpcContext* StartNewClone() = 0;
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
static ClientRpcContext* detag(void* t) {
@@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
}
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
switch (next_state_) {
case State::READY:
start_ = UsageTimer::Now();
@@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_ = State::RESP_DONE;
return true;
case State::RESP_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::INVALID;
return false;
@@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
for (int i = 0; i < num_async_threads_; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
next_issuers_.emplace_back(NextIssuer(i));
+ shutdown_state_.emplace_back(new PerThreadShutdownState());
}
using namespace std::placeholders;
@@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
virtual ~AsyncClient() {
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
- (*cq)->Shutdown();
void* got_tag;
bool ok;
while ((*cq)->Next(&got_tag, &ok)) {
@@ -201,32 +199,16 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
- bool ThreadFunc(Histogram* histogram,
- size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
- void* got_tag;
- bool ok;
-
- if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
- // Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- if (!ctx->RunNextState(ok, histogram)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- auto clone = ctx->StartNewClone();
- clone->Start(cli_cqs_[thread_idx].get());
- // delete the old version
- delete ctx;
- }
- return true;
- } else { // queue is shutting down
- return false;
- }
- }
-
protected:
const int num_async_threads_;
private:
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
+ };
+
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
@@ -235,9 +217,60 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
return num_threads;
}
+ void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
+ for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
+ }
+ for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+ (*cq)->Shutdown();
+ }
+ this->EndThreads(); // this needed for resolution
+ }
+
+ bool ThreadFunc(HistogramEntry* entry,
+ size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
+ void* got_tag;
+ bool ok;
+
+ switch (cli_cqs_[thread_idx]->AsyncNext(
+ &got_tag, &ok,
+ std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
+ case CompletionQueue::GOT_EVENT: {
+ // Got a regular event, so process it
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ return true;
+ } else if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ auto clone = ctx->StartNewClone();
+ clone->Start(cli_cqs_[thread_idx].get());
+ // delete the old version
+ delete ctx;
+ }
+ return true;
+ }
+ case CompletionQueue::TIMEOUT: {
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ return true;
+ }
+ return true;
+ }
+ case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
+ // done
+ return true;
+ }
+ GPR_UNREACHABLE_CODE(return true);
+ }
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
+ std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@@ -253,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL
config, SetupCtx, BenchmarkStubCreator) {
StartThreads(num_async_threads_);
}
- ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
+ ~AsyncUnaryClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -298,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@@ -330,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@@ -382,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
- ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+ ~AsyncStreamingClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -430,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@@ -462,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@@ -518,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
- ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+ ~GenericAsyncStreamingClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index c88e95b80e..25c7823553 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -31,7 +31,6 @@
*
*/
-#include <cassert>
#include <chrono>
#include <memory>
#include <mutex>
@@ -46,7 +45,6 @@
#include <grpc++/server_builder.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/histogram.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -55,7 +53,6 @@
#include "src/core/lib/profiling/timers.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/cpp/qps/client.h"
-#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/usage_timer.h"
@@ -90,6 +87,9 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
+
+ private:
+ void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); }
};
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
@@ -98,9 +98,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
: SynchronousClient(config) {
StartThreads(num_threads_);
}
- ~SynchronousUnaryClient() { EndThreads(); }
+ ~SynchronousUnaryClient() {}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = UsageTimer::Now();
@@ -108,7 +108,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::ClientContext context;
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
- histogram->Add((UsageTimer::Now() - start) * 1e9);
+ entry->set_value((UsageTimer::Now() - start) * 1e9);
return s.ok();
}
};
@@ -127,25 +127,29 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
- EndThreads();
- for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
- stream++) {
+ for (size_t i = 0; i < num_threads_; i++) {
+ auto stream = &stream_[i];
if (*stream) {
(*stream)->WritesDone();
- EXPECT_TRUE((*stream)->Finish().ok());
+ Status s = (*stream)->Finish();
+ EXPECT_TRUE(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
+ s.error_message().c_str());
+ }
}
}
delete[] stream_;
delete[] context_;
}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0);
double start = UsageTimer::Now();
if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
- histogram->Add((UsageTimer::Now() - start) * 1e9);
+ entry->set_value((UsageTimer::Now() - start) * 1e9);
return true;
}
return false;
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 08bf045883..2aeaea51f2 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -87,7 +87,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
- assert(s.ok());
+ GPR_ASSERT(s.ok());
std::deque<int> dq;
for (int i = 0; i < cores.cores(); i++) {
dq.push_back(i);
@@ -289,9 +289,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts));
- GPR_ASSERT(servers[i].stream->Write(args));
+ if (!servers[i].stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
+ }
ServerStatus init_status;
- GPR_ASSERT(servers[i].stream->Read(&init_status));
+ if (!servers[i].stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
+ }
gpr_join_host_port(&cli_target, host, init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(host);
@@ -345,9 +349,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = per_client_config;
clients[i].stream =
clients[i].stub->RunClient(runsc::AllocContext(&contexts));
- GPR_ASSERT(clients[i].stream->Write(args));
+ if (!clients[i].stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
+ }
ClientStatus init_status;
- GPR_ASSERT(clients[i].stream->Read(&init_status));
+ if (!clients[i].stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
+ }
}
// Let everything warmup
@@ -362,19 +370,31 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
client_mark.mutable_mark()->set_reset(true);
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Write(server_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Write(client_mark));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
}
ServerStatus server_status;
ClientStatus client_status;
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Read(&server_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Read(&client_status));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Read(&client_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
+ }
}
// Wait some time
@@ -390,37 +410,73 @@ std::unique_ptr<ScenarioResult> RunScenario(
Histogram merged_latencies;
gpr_log(GPR_INFO, "Finishing clients");
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Write(client_mark));
- GPR_ASSERT(client->stream->WritesDone());
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
+ if (!client->stream->WritesDone()) {
+ gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Read(&client_status));
- const auto& stats = client_status.stats();
- merged_latencies.MergeProto(stats.latencies());
- result->add_client_stats()->CopyFrom(stats);
- GPR_ASSERT(!client->stream->Read(&client_status));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ // Read the client final status
+ if (client->stream->Read(&client_status)) {
+ gpr_log(GPR_INFO, "Received final status from client %zu", i);
+ const auto& stats = client_status.stats();
+ merged_latencies.MergeProto(stats.latencies());
+ result->add_client_stats()->CopyFrom(stats);
+ // That final status should be the last message on the client stream
+ GPR_ASSERT(!client->stream->Read(&client_status));
+ } else {
+ gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Finish().ok());
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ Status s = client->stream->Finish();
+ result->add_client_success(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
+ s.error_message().c_str());
+ }
}
delete[] clients;
merged_latencies.FillProto(result->mutable_latencies());
gpr_log(GPR_INFO, "Finishing servers");
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
- GPR_ASSERT(server->stream->WritesDone());
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Write(server_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+ }
+ if (!server->stream->WritesDone()) {
+ gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
+ }
}
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
- result->add_server_stats()->CopyFrom(server_status.stats());
- result->add_server_cores(server_status.cores());
- GPR_ASSERT(!server->stream->Read(&server_status));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ // Read the server final status
+ if (server->stream->Read(&server_status)) {
+ gpr_log(GPR_INFO, "Received final status from server %zu", i);
+ result->add_server_stats()->CopyFrom(server_status.stats());
+ result->add_server_cores(server_status.cores());
+ // That final status should be the last message on the server stream
+ GPR_ASSERT(!server->stream->Read(&server_status));
+ } else {
+ gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
+ }
}
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Finish().ok());
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ Status s = server->stream->Finish();
+ result->add_server_success(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
+ s.error_message().c_str());
+ }
}
delete[] servers;
@@ -429,8 +485,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
return result;
}
-void RunQuit() {
+bool RunQuit() {
// Get client, server lists
+ bool result = true;
auto workers = get_workers("QPS_WORKERS");
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
@@ -438,8 +495,14 @@ void RunQuit() {
Void dummy;
grpc::ClientContext ctx;
ctx.set_fail_fast(false);
- GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
+ Status s = stub->QuitWorker(&ctx, dummy, &dummy);
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
+ s.error_message().c_str());
+ result = false;
+ }
}
+ return result;
}
} // namespace testing
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 3a5cf138f1..93f4370caf 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -47,7 +47,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
-void RunQuit();
+bool RunQuit();
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py
index 34b8151441..4ff4e44b8b 100755
--- a/test/cpp/qps/gen_build_yaml.py
+++ b/test/cpp/qps/gen_build_yaml.py
@@ -45,9 +45,10 @@ import performance.scenario_config as scenario_config
def _scenario_json_string(scenario_json):
# tweak parameters to get fast test times
- scenario_json['warmup_seconds'] = 1
+ scenario_json['warmup_seconds'] = 0
scenario_json['benchmark_seconds'] = 1
- return json.dumps(scenario_config.remove_nonproto_fields(scenario_json))
+ scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(scenario_json)]}
+ return json.dumps(scenarios_json)
def threads_of_type(scenario_json, path):
d = scenario_json
@@ -72,8 +73,7 @@ print yaml.dump({
{
'name': 'json_run_localhost',
'shortname': 'json_run_localhost:%s' % scenario_json['name'],
- 'args': ['--scenario_json',
- pipes.quote(_scenario_json_string(scenario_json))],
+ 'args': ['--scenarios_json', _scenario_json_string(scenario_json)],
'ci_platforms': ['linux', 'mac', 'posix', 'windows'],
'platforms': ['linux', 'mac', 'posix', 'windows'],
'flaky': False,
@@ -81,7 +81,8 @@ print yaml.dump({
'boringssl': True,
'defaults': 'boringssl',
'cpu_cost': guess_cpu(scenario_json),
- 'exclude_configs': []
+ 'exclude_configs': [],
+ 'timeout_seconds': 3*60
}
for scenario_json in scenario_config.CXXLanguage().scenarios()
]
diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc
index 6545dc2917..74e40fbf1a 100644
--- a/test/cpp/qps/json_run_localhost.cc
+++ b/test/cpp/qps/json_run_localhost.cc
@@ -75,7 +75,7 @@ int main(int argc, char** argv) {
for (int i = 1; i < argc; i++) {
args.push_back(argv[i]);
}
- SubProcess(args).Join();
+ GPR_ASSERT(SubProcess(args).Join() == 0);
for (auto it = jobs.begin(); it != jobs.end(); ++it) {
(*it)->Interrupt();
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index f5d739f893..1524ebbc38 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers");
namespace grpc {
namespace testing {
-static void QpsDriver() {
+static bool QpsDriver() {
grpc::string json;
bool scfile = (FLAGS_scenarios_file != "");
@@ -81,13 +81,13 @@ static void QpsDriver() {
} else if (scjson) {
json = FLAGS_scenarios_json.c_str();
} else if (FLAGS_quit) {
- RunQuit();
- return;
+ return RunQuit();
}
// Parse into an array of scenarios
Scenarios scenarios;
ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios);
+ bool success = true;
// Make sure that there is at least some valid scenario here
GPR_ASSERT(scenarios.scenarios_size() > 0);
@@ -109,7 +109,15 @@ static void QpsDriver() {
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
+
+ for (int i = 0; success && i < result->client_success_size(); i++) {
+ success = result->client_success(i);
+ }
+ for (int i = 0; success && i < result->server_success_size(); i++) {
+ success = result->server_success(i);
+ }
}
+ return success;
}
} // namespace testing
@@ -118,7 +126,7 @@ static void QpsDriver() {
int main(int argc, char **argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- grpc::testing::QpsDriver();
+ bool ok = grpc::testing::QpsDriver();
- return 0;
+ return ok ? 0 : 1;
}
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index f514e23e85..d3e53fe14a 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -33,7 +33,6 @@
#include "test/cpp/qps/qps_worker.h"
-#include <cassert>
#include <memory>
#include <mutex>
#include <sstream>
@@ -124,11 +123,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
}
ScopedProfile profile("qps_client.prof", false);
Status ret = RunClientBody(ctx, stream);
+ gpr_log(GPR_INFO, "RunClient: Returning");
return ret;
}
@@ -137,11 +137,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
}
ScopedProfile profile("qps_server.prof", false);
Status ret = RunServerBody(ctx, stream);
+ gpr_log(GPR_INFO, "RunServer: Returning");
return ret;
}
@@ -154,7 +155,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
}
worker_->MarkDone();
@@ -197,33 +198,38 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
}
if (!args.has_setup()) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
}
gpr_log(GPR_INFO, "RunClientBody: about to create client");
auto client = CreateClient(args.setup());
if (!client) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
}
gpr_log(GPR_INFO, "RunClientBody: client created");
ClientStatus status;
if (!stream->Write(status)) {
- return Status(StatusCode::UNKNOWN, "");
+ return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
}
gpr_log(GPR_INFO, "RunClientBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunClientBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = client->Mark(args.mark().reset());
- stream->Write(status);
+ if (!stream->Write(status)) {
+ return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
+ }
gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
+ gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
+ client->AwaitThreadsCompletion();
+
gpr_log(GPR_INFO, "RunClientBody: Returning");
return Status::OK;
}
@@ -232,10 +238,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
ServerArgs args;
if (!stream->Read(&args)) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
}
if (!args.has_setup()) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
}
if (server_port_ != 0) {
args.mutable_setup()->set_port(server_port_);
@@ -243,24 +249,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
gpr_log(GPR_INFO, "RunServerBody: about to create server");
auto server = CreateServer(args.setup());
if (!server) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
}
gpr_log(GPR_INFO, "RunServerBody: server created");
ServerStatus status;
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
- return Status(StatusCode::UNKNOWN, "");
+ return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
}
gpr_log(GPR_INFO, "RunServerBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunServerBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = server->Mark(args.mark().reset());
- stream->Write(status);
+ if (!stream->Write(status)) {
+ return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
+ }
gpr_log(GPR_INFO, "RunServerBody: Mark response given");
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index c9954d0d02..dea8746331 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -102,7 +102,7 @@ class AsyncQpsServerTest : public Server {
auto process_rpc_bound =
std::bind(process_rpc, config.payload_config(), _1, _2);
- for (int i = 0; i < 10000 / num_threads; i++) {
+ for (int i = 0; i < 15000; i++) {
for (int j = 0; j < num_threads; j++) {
if (request_unary_function) {
auto request_unary =
@@ -123,21 +123,24 @@ class AsyncQpsServerTest : public Server {
for (int i = 0; i < num_threads; i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
- }
- for (int i = 0; i < num_threads; i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
~AsyncQpsServerTest() {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
- (*ss)->set_shutdown();
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
+ }
+ // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
+ auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
+ server_->Shutdown(deadline);
+ for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
+ (*cq)->Shutdown();
}
- server_->Shutdown();
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
- (*cq)->Shutdown();
bool ok;
void *got_tag;
while ((*cq)->Next(&got_tag, &ok))
@@ -150,22 +153,24 @@ class AsyncQpsServerTest : public Server {
}
private:
- void ThreadFunc(int rank) {
+ void ThreadFunc(int thread_idx) {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
+ while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
- const bool still_going = ctx->RunNextState(ok);
- if (!shutdown_state_[rank]->shutdown()) {
- // this RPC context is done, so refresh it
- if (!still_going) {
- ctx->Reset();
- }
- } else {
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
return;
}
+ const bool still_going = ctx->RunNextState(ok);
+ // if this RPC context is done, refresh it
+ if (!still_going) {
+ ctx->Reset();
+ }
}
return;
}
@@ -333,24 +338,12 @@ class AsyncQpsServerTest : public Server {
ServiceType async_service_;
std::forward_list<ServerRpcContext *> contexts_;
- class PerThreadShutdownState {
- public:
- PerThreadShutdownState() : shutdown_(false) {}
-
- bool shutdown() const {
- std::lock_guard<std::mutex> lock(mutex_);
- return shutdown_;
- }
-
- void set_shutdown() {
- std::lock_guard<std::mutex> lock(mutex_);
- shutdown_ = true;
- }
-
- private:
- mutable std::mutex mutex_;
- bool shutdown_;
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
};
+
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
diff --git a/test/cpp/util/slice_test.cc b/test/cpp/util/slice_test.cc
index de7ff031ab..45799ae157 100644
--- a/test/cpp/util/slice_test.cc
+++ b/test/cpp/util/slice_test.cc
@@ -68,6 +68,16 @@ TEST_F(SliceTest, Empty) {
CheckSlice(empty_slice, "");
}
+TEST_F(SliceTest, Cslice) {
+ gpr_slice s = gpr_slice_from_copied_string(kContent);
+ Slice spp(s, Slice::STEAL_REF);
+ CheckSlice(spp, kContent);
+ gpr_slice c_slice = spp.c_slice();
+ EXPECT_EQ(GPR_SLICE_START_PTR(s), GPR_SLICE_START_PTR(c_slice));
+ EXPECT_EQ(GPR_SLICE_END_PTR(s), GPR_SLICE_END_PTR(c_slice));
+ gpr_slice_unref(c_slice);
+}
+
} // namespace
} // namespace grpc