diff options
author | Craig Tiller <ctiller@google.com> | 2017-10-10 22:14:56 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-10-10 22:14:56 -0700 |
commit | 5d38ac60b84dfebc20d142501e8b55e3154c8d65 (patch) | |
tree | 8708ad4ad3c49762e2cc1a7ad7eded9f52bdc077 /test | |
parent | b8b68761aed8053d894b04a56e6a3e158fed801d (diff) | |
parent | 0a64225aab8b8b66890839cd04655ca96f86d59a (diff) |
Merge branch 'timer' into pid++
Diffstat (limited to 'test')
-rw-r--r-- | test/core/surface/init_test.c | 2 | ||||
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 32 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 28 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 1 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 56 |
5 files changed, 72 insertions, 47 deletions
diff --git a/test/core/surface/init_test.c b/test/core/surface/init_test.c index a9e80575af..b835a2a884 100644 --- a/test/core/surface/init_test.c +++ b/test/core/surface/init_test.c @@ -53,7 +53,7 @@ static void test_plugin() { } static void test_repeatedly() { - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 1000; i++) { grpc_init(); grpc_shutdown(); } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index f938aea40e..a14b4d5295 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -102,14 +102,23 @@ class Verifier { explicit Verifier(bool spin) : spin_(spin) {} // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { - expectations_[tag(i)] = expect_ok; + return ExpectUnless(i, expect_ok, false); + } + // ExpectUnless sets the expected ok value for a specific tag + // unless the tag was already marked seen (as a result of ExpectMaybe) + Verifier& ExpectUnless(int i, bool expect_ok, bool seen) { + if (!seen) { + expectations_[tag(i)] = expect_ok; + } return *this; } - // AcceptOnce sets the expected ok value for a specific tag, but does not + // ExpectMaybe sets the expected ok value for a specific tag, but does not // require it to appear // If it does, sets *seen to true - Verifier& AcceptOnce(int i, bool expect_ok, bool* seen) { - maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen}; + Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) { + if (!*seen) { + maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen}; + } return *this; } @@ -569,13 +578,13 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) { Verifier(GetParam().disable_blocking) .Expect(2, true) - .AcceptOnce(3, true, &seen3) + .ExpectMaybe(3, true, &seen3) .Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); Verifier(GetParam().disable_blocking) - .AcceptOnce(3, true, &seen3) + .ExpectUnless(3, true, seen3) .Expect(4, true) .Verify(cq_.get()); @@ -602,7 +611,6 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) { EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); - EXPECT_TRUE(seen3); } // One ping, two pongs. @@ -853,13 +861,13 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) { Verifier(GetParam().disable_blocking) .Expect(2, true) - .AcceptOnce(3, true, &seen3) + .ExpectMaybe(3, true, &seen3) .Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); Verifier(GetParam().disable_blocking) - .AcceptOnce(3, true, &seen3) + .ExpectUnless(3, true, seen3) .Expect(4, true) .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -880,7 +888,6 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) { Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); - EXPECT_TRUE(seen3); } // One ping, one pong. Using server:WriteLast api @@ -910,13 +917,13 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) { Verifier(GetParam().disable_blocking) .Expect(2, true) - .AcceptOnce(3, true, &seen3) + .ExpectMaybe(3, true, &seen3) .Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); Verifier(GetParam().disable_blocking) - .AcceptOnce(3, true, &seen3) + .ExpectUnless(3, true, seen3) .Expect(4, true) .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -939,7 +946,6 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) { Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); - EXPECT_TRUE(seen3); } // Metadata tests diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 7fbaf63492..abf755b393 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -226,6 +226,7 @@ class Client { } virtual void DestroyMultithreading() = 0; + virtual void InitThreadFunc(size_t thread_idx) = 0; virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { @@ -299,13 +300,18 @@ class Client { Thread& operator=(const Thread&); void ThreadFunc() { + int wait_loop = 0; while (!gpr_event_wait( &client_->start_requests_, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(1, GPR_TIMESPAN)))) { - gpr_log(GPR_INFO, "Waiting for benchmark to start"); + gpr_time_from_seconds(20, GPR_TIMESPAN)))) { + gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)", + idx_, wait_loop); + wait_loop++; } + client_->InitThreadFunc(idx_); + for (;;) { // run the loop body HistogramEntry entry; @@ -380,6 +386,13 @@ class ClientImpl : public Client { config.server_targets(i % config.server_targets_size()), config, create_stub_, i); } + std::vector<std::unique_ptr<std::thread>> connecting_threads; + for (auto& c : channels_) { + connecting_threads.emplace_back(c.WaitForReady()); + } + for (auto& t : connecting_threads) { + t->join(); + } ClientRequestCreator<RequestType> create_req(&request_, config.payload_config()); @@ -414,14 +427,19 @@ class ClientImpl : public Client { !config.security_params().use_test_ca(), std::shared_ptr<CallCredentials>(), args); gpr_log(GPR_INFO, "Connecting to %s", target.c_str()); - GPR_ASSERT(channel_->WaitForConnected( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(300, GPR_TIMESPAN)))); stub_ = create_stub(channel_); } Channel* get_channel() { return channel_.get(); } StubType* get_stub() { return stub_.get(); } + std::unique_ptr<std::thread> WaitForReady() { + return std::unique_ptr<std::thread>(new std::thread([this]() { + GPR_ASSERT(channel_->WaitForConnected( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(10, GPR_TIMESPAN)))); + })); + } + private: void set_channel_args(const ClientConfig& config, ChannelArguments* args) { for (auto channel_arg : config.channel_args()) { diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f5807da81e..9ed4e0b355 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,6 +236,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { this->EndThreads(); // this needed for resolution } + void InitThreadFunc(size_t thread_idx) override final {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final { void* got_tag; bool ok; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 5d212f1acc..94554a46b2 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -103,6 +103,8 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} + void InitThreadFunc(size_t thread_idx) override {} + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; @@ -174,13 +176,7 @@ class SynchronousStreamingPingPongClient final grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { public: SynchronousStreamingPingPongClient(const ClientConfig& config) - : SynchronousStreamingClient(config) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); - messages_issued_[thread_idx] = 0; - } - } + : SynchronousStreamingClient(config) {} ~SynchronousStreamingPingPongClient() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -196,6 +192,12 @@ class SynchronousStreamingPingPongClient final } } + void InitThreadFunc(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + messages_issued_[thread_idx] = 0; + } + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; @@ -228,14 +230,7 @@ class SynchronousStreamingFromClientClient final : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> { public: SynchronousStreamingFromClientClient(const ClientConfig& config) - : SynchronousStreamingClient(config), last_issue_(num_threads_) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); - last_issue_[thread_idx] = UsageTimer::Now(); - } - } + : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -251,6 +246,13 @@ class SynchronousStreamingFromClientClient final } } + void InitThreadFunc(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + last_issue_[thread_idx] = UsageTimer::Now(); + } + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { // Figure out how to make histogram sensible if this is rate-paced if (!WaitToIssue(thread_idx)) { @@ -279,13 +281,12 @@ class SynchronousStreamingFromServerClient final : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> { public: SynchronousStreamingFromServerClient(const ClientConfig& config) - : SynchronousStreamingClient(config), last_recv_(num_threads_) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); - last_recv_[thread_idx] = UsageTimer::Now(); - } + : SynchronousStreamingClient(config), last_recv_(num_threads_) {} + void InitThreadFunc(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + last_recv_[thread_idx] = UsageTimer::Now(); } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); @@ -311,12 +312,7 @@ class SynchronousStreamingBothWaysClient final grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { public: SynchronousStreamingBothWaysClient(const ClientConfig& config) - : SynchronousStreamingClient(config) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); - } - } + : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -332,6 +328,10 @@ class SynchronousStreamingBothWaysClient final } } + void InitThreadFunc(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; |