aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-10-10 22:14:56 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-10-10 22:14:56 -0700
commit5d38ac60b84dfebc20d142501e8b55e3154c8d65 (patch)
tree8708ad4ad3c49762e2cc1a7ad7eded9f52bdc077 /test
parentb8b68761aed8053d894b04a56e6a3e158fed801d (diff)
parent0a64225aab8b8b66890839cd04655ca96f86d59a (diff)
Merge branch 'timer' into pid++
Diffstat (limited to 'test')
-rw-r--r--test/core/surface/init_test.c2
-rw-r--r--test/cpp/end2end/async_end2end_test.cc32
-rw-r--r--test/cpp/qps/client.h28
-rw-r--r--test/cpp/qps/client_async.cc1
-rw-r--r--test/cpp/qps/client_sync.cc56
5 files changed, 72 insertions, 47 deletions
diff --git a/test/core/surface/init_test.c b/test/core/surface/init_test.c
index a9e80575af..b835a2a884 100644
--- a/test/core/surface/init_test.c
+++ b/test/core/surface/init_test.c
@@ -53,7 +53,7 @@ static void test_plugin() {
}
static void test_repeatedly() {
- for (int i = 0; i < 100000; i++) {
+ for (int i = 0; i < 1000; i++) {
grpc_init();
grpc_shutdown();
}
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index f938aea40e..a14b4d5295 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -102,14 +102,23 @@ class Verifier {
explicit Verifier(bool spin) : spin_(spin) {}
// Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) {
- expectations_[tag(i)] = expect_ok;
+ return ExpectUnless(i, expect_ok, false);
+ }
+ // ExpectUnless sets the expected ok value for a specific tag
+ // unless the tag was already marked seen (as a result of ExpectMaybe)
+ Verifier& ExpectUnless(int i, bool expect_ok, bool seen) {
+ if (!seen) {
+ expectations_[tag(i)] = expect_ok;
+ }
return *this;
}
- // AcceptOnce sets the expected ok value for a specific tag, but does not
+ // ExpectMaybe sets the expected ok value for a specific tag, but does not
// require it to appear
// If it does, sets *seen to true
- Verifier& AcceptOnce(int i, bool expect_ok, bool* seen) {
- maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
+ Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) {
+ if (!*seen) {
+ maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
+ }
return *this;
}
@@ -569,13 +578,13 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
Verifier(GetParam().disable_blocking)
.Expect(2, true)
- .AcceptOnce(3, true, &seen3)
+ .ExpectMaybe(3, true, &seen3)
.Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking)
- .AcceptOnce(3, true, &seen3)
+ .ExpectUnless(3, true, seen3)
.Expect(4, true)
.Verify(cq_.get());
@@ -602,7 +611,6 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
- EXPECT_TRUE(seen3);
}
// One ping, two pongs.
@@ -853,13 +861,13 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
Verifier(GetParam().disable_blocking)
.Expect(2, true)
- .AcceptOnce(3, true, &seen3)
+ .ExpectMaybe(3, true, &seen3)
.Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking)
- .AcceptOnce(3, true, &seen3)
+ .ExpectUnless(3, true, seen3)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -880,7 +888,6 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
- EXPECT_TRUE(seen3);
}
// One ping, one pong. Using server:WriteLast api
@@ -910,13 +917,13 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
Verifier(GetParam().disable_blocking)
.Expect(2, true)
- .AcceptOnce(3, true, &seen3)
+ .ExpectMaybe(3, true, &seen3)
.Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking)
- .AcceptOnce(3, true, &seen3)
+ .ExpectUnless(3, true, seen3)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -939,7 +946,6 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
- EXPECT_TRUE(seen3);
}
// Metadata tests
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 7fbaf63492..abf755b393 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -226,6 +226,7 @@ class Client {
}
virtual void DestroyMultithreading() = 0;
+ virtual void InitThreadFunc(size_t thread_idx) = 0;
virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
@@ -299,13 +300,18 @@ class Client {
Thread& operator=(const Thread&);
void ThreadFunc() {
+ int wait_loop = 0;
while (!gpr_event_wait(
&client_->start_requests_,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(1, GPR_TIMESPAN)))) {
- gpr_log(GPR_INFO, "Waiting for benchmark to start");
+ gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
+ gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)",
+ idx_, wait_loop);
+ wait_loop++;
}
+ client_->InitThreadFunc(idx_);
+
for (;;) {
// run the loop body
HistogramEntry entry;
@@ -380,6 +386,13 @@ class ClientImpl : public Client {
config.server_targets(i % config.server_targets_size()), config,
create_stub_, i);
}
+ std::vector<std::unique_ptr<std::thread>> connecting_threads;
+ for (auto& c : channels_) {
+ connecting_threads.emplace_back(c.WaitForReady());
+ }
+ for (auto& t : connecting_threads) {
+ t->join();
+ }
ClientRequestCreator<RequestType> create_req(&request_,
config.payload_config());
@@ -414,14 +427,19 @@ class ClientImpl : public Client {
!config.security_params().use_test_ca(),
std::shared_ptr<CallCredentials>(), args);
gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
- GPR_ASSERT(channel_->WaitForConnected(
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(300, GPR_TIMESPAN))));
stub_ = create_stub(channel_);
}
Channel* get_channel() { return channel_.get(); }
StubType* get_stub() { return stub_.get(); }
+ std::unique_ptr<std::thread> WaitForReady() {
+ return std::unique_ptr<std::thread>(new std::thread([this]() {
+ GPR_ASSERT(channel_->WaitForConnected(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(10, GPR_TIMESPAN))));
+ }));
+ }
+
private:
void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
for (auto channel_arg : config.channel_args()) {
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index f5807da81e..9ed4e0b355 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -236,6 +236,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
this->EndThreads(); // this needed for resolution
}
+ void InitThreadFunc(size_t thread_idx) override final {}
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final {
void* got_tag;
bool ok;
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 5d212f1acc..94554a46b2 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -103,6 +103,8 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
~SynchronousUnaryClient() {}
+ void InitThreadFunc(size_t thread_idx) override {}
+
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
@@ -174,13 +176,7 @@ class SynchronousStreamingPingPongClient final
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
public:
SynchronousStreamingPingPongClient(const ClientConfig& config)
- : SynchronousStreamingClient(config) {
- for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
- auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
- messages_issued_[thread_idx] = 0;
- }
- }
+ : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient() {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
@@ -196,6 +192,12 @@ class SynchronousStreamingPingPongClient final
}
}
+ void InitThreadFunc(size_t thread_idx) override {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ messages_issued_[thread_idx] = 0;
+ }
+
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
@@ -228,14 +230,7 @@ class SynchronousStreamingFromClientClient final
: public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
public:
SynchronousStreamingFromClientClient(const ClientConfig& config)
- : SynchronousStreamingClient(config), last_issue_(num_threads_) {
- for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
- auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
- &responses_[thread_idx]);
- last_issue_[thread_idx] = UsageTimer::Now();
- }
- }
+ : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient() {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
@@ -251,6 +246,13 @@ class SynchronousStreamingFromClientClient final
}
}
+ void InitThreadFunc(size_t thread_idx) override {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ last_issue_[thread_idx] = UsageTimer::Now();
+ }
+
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
// Figure out how to make histogram sensible if this is rate-paced
if (!WaitToIssue(thread_idx)) {
@@ -279,13 +281,12 @@ class SynchronousStreamingFromServerClient final
: public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
- : SynchronousStreamingClient(config), last_recv_(num_threads_) {
- for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
- auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] =
- stub->StreamingFromServer(&context_[thread_idx], request_);
- last_recv_[thread_idx] = UsageTimer::Now();
- }
+ : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
+ void InitThreadFunc(size_t thread_idx) override {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ last_recv_[thread_idx] = UsageTimer::Now();
}
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
@@ -311,12 +312,7 @@ class SynchronousStreamingBothWaysClient final
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
public:
SynchronousStreamingBothWaysClient(const ClientConfig& config)
- : SynchronousStreamingClient(config) {
- for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
- auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
- }
- }
+ : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient() {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
@@ -332,6 +328,10 @@ class SynchronousStreamingBothWaysClient final
}
}
+ void InitThreadFunc(size_t thread_idx) override {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ }
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;