diff options
author | yang-g <yangg@google.com> | 2015-07-06 14:05:54 -0700 |
---|---|---|
committer | yang-g <yangg@google.com> | 2015-07-06 14:05:54 -0700 |
commit | 5ea46ab2482c3724fbc7fd0aab55f324fb65999c (patch) | |
tree | 55eebc4aae8f06f931c8f75ddf84d56595f99fa1 /test/cpp | |
parent | 3abe60b9d08ff5a784a39f7c4a10c631547c3526 (diff) | |
parent | d426864934ac60f46e538ba81932e405fa8949b1 (diff) |
merge with upstream and resolve conflicts
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 230 | ||||
-rw-r--r-- | test/cpp/end2end/client_crash_test.cc | 29 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 39 | ||||
-rw-r--r-- | test/cpp/end2end/generic_end2end_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 28 | ||||
-rw-r--r-- | test/cpp/qps/perf_db.proto | 71 | ||||
-rw-r--r-- | test/cpp/qps/perf_db_client.cc | 143 | ||||
-rw-r--r-- | test/cpp/qps/perf_db_client.h | 115 | ||||
-rw-r--r-- | test/cpp/qps/qps_test_openloop.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/qps_test_with_poll.cc | 90 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/report.cc | 90 | ||||
-rw-r--r-- | test/cpp/qps/report.h | 54 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 35 | ||||
-rw-r--r-- | test/cpp/qps/worker.cc | 2 | ||||
-rw-r--r-- | test/cpp/util/benchmark_config.cc | 18 | ||||
-rw-r--r-- | test/cpp/util/byte_buffer_test.cc | 115 | ||||
-rw-r--r-- | test/cpp/util/grpc_cli.cc | 2 | ||||
-rw-r--r-- | test/cpp/util/slice_test.cc | 77 |
19 files changed, 974 insertions, 170 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 871f956491..117d8bb9fa 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -67,27 +67,45 @@ namespace { void* tag(int i) { return (void*)(gpr_intptr) i; } -void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { - bool ok; - void* got_tag; - EXPECT_TRUE(cq->Next(&got_tag, &ok)); - EXPECT_EQ(expect_ok, ok); - EXPECT_EQ(tag(i), got_tag); -} - -void verify_timed_ok( - CompletionQueue* cq, int i, bool expect_ok, - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::time_point::max(), - CompletionQueue::NextStatus expected_outcome = CompletionQueue::GOT_EVENT) { - bool ok; - void* got_tag; - EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome); - if (expected_outcome == CompletionQueue::GOT_EVENT) { - EXPECT_EQ(expect_ok, ok); - EXPECT_EQ(tag(i), got_tag); +class Verifier { + public: + Verifier& Expect(int i, bool expect_ok) { + expectations_[tag(i)] = expect_ok; + return *this; } -} + void Verify(CompletionQueue *cq) { + GPR_ASSERT(!expectations_.empty()); + while (!expectations_.empty()) { + bool ok; + void* got_tag; + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + EXPECT_EQ(it->second, ok); + expectations_.erase(it); + } + } + void Verify(CompletionQueue *cq, std::chrono::system_clock::time_point deadline) { + if (expectations_.empty()) { + bool ok; + void *got_tag; + EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), CompletionQueue::TIMEOUT); + } else { + while (!expectations_.empty()) { + bool ok; + void *got_tag; + EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), CompletionQueue::GOT_EVENT); + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + EXPECT_EQ(it->second, ok); + expectations_.erase(it); + } + } + } + + private: + std::map<void*, bool> expectations_; +}; class AsyncEnd2endTest : public ::testing::Test { protected: @@ -100,7 +118,7 @@ class AsyncEnd2endTest : public ::testing::Test { ServerBuilder builder; builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials()); builder.RegisterAsyncService(&service_); - srv_cq_ = builder.AddCompletionQueue(); + cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); } @@ -108,11 +126,8 @@ class AsyncEnd2endTest : public ::testing::Test { server_->Shutdown(); void* ignored_tag; bool ignored_ok; - cli_cq_.Shutdown(); - srv_cq_->Shutdown(); - while (cli_cq_.Next(&ignored_tag, &ignored_ok)) - ; - while (srv_cq_->Next(&ignored_tag, &ignored_ok)) + cq_->Shutdown(); + while (cq_->Next(&ignored_tag, &ignored_ok)) ; } @@ -122,11 +137,6 @@ class AsyncEnd2endTest : public ::testing::Test { stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); } - void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } - void client_ok(int i) { verify_ok(&cli_cq_, i, true); } - void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } - void client_fail(int i) { verify_ok(&cli_cq_, i, false); } - void SendRpc(int num_rpcs) { for (int i = 0; i < num_rpcs; i++) { EchoRequest send_request; @@ -141,28 +151,27 @@ class AsyncEnd2endTest : public ::testing::Test { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, - srv_cq_.get(), srv_cq_.get(), tag(2)); + cq_.get(), cq_.get(), tag(2)); - server_ok(2); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } } - CompletionQueue cli_cq_; - std::unique_ptr<ServerCompletionQueue> srv_cq_; + std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; std::unique_ptr<Server> server_; grpc::cpp::test::util::TestService::AsyncService service_; @@ -195,27 +204,27 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); std::chrono::system_clock::time_point time_now( std::chrono::system_clock::now()); std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::now() + std::chrono::seconds(10)); - verify_timed_ok(srv_cq_.get(), -1, true, time_now, CompletionQueue::TIMEOUT); - verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); + Verifier().Verify(cq_.get(), time_now); + Verifier().Verify(cq_.get(), time_now); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); - verify_timed_ok(srv_cq_.get(), 2, true, time_limit); + Verifier().Expect(2, true).Verify(cq_.get(), time_limit); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - verify_timed_ok(srv_cq_.get(), 3, true); + Verifier().Expect(3, true).Verify(cq_.get(), std::chrono::system_clock::time_point::max()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - verify_timed_ok(&cli_cq_, 4, true); + Verifier().Expect(4, true).Verify(cq_.get(), std::chrono::system_clock::time_point::max()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -236,40 +245,39 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream( - stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1))); + stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - service_.RequestRequestStream(&srv_ctx, &srv_stream, srv_cq_.get(), - srv_cq_.get(), tag(2)); + service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), + cq_.get(), tag(2)); - server_ok(2); - client_ok(1); + Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - client_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); - server_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->Write(send_request, tag(5)); - client_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(6)); - server_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->WritesDone(tag(7)); - client_ok(7); + Verifier().Expect(7, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(8)); - server_fail(8); + Verifier().Expect(8, false).Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_stream.Finish(send_response, Status::OK, tag(9)); - server_ok(9); + Verifier().Expect(9, true).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - client_ok(10); + Verifier().Expect(10, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -290,38 +298,37 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream( - stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1))); + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - srv_cq_.get(), srv_cq_.get(), tag(2)); + cq_.get(), cq_.get(), tag(2)); - server_ok(2); - client_ok(1); + Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Write(send_response, tag(5)); - server_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(6)); - client_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Finish(Status::OK, tag(7)); - server_ok(7); + Verifier().Expect(7, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(8)); - client_fail(8); + Verifier().Expect(8, false).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(9)); - client_ok(9); + Verifier().Expect(9, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -341,40 +348,39 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> > - cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1))); + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, srv_cq_.get(), - srv_cq_.get(), tag(2)); + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), + cq_.get(), tag(2)); - server_ok(2); - client_ok(1); + Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - client_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); - server_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(5)); - server_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(6)); - client_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); cli_stream->WritesDone(tag(7)); - client_ok(7); + Verifier().Expect(7, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(8)); - server_fail(8); + Verifier().Expect(8, false).Verify(cq_.get()); srv_stream.Finish(Status::OK, tag(9)); - server_ok(9); + Verifier().Expect(9, true).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - client_ok(10); + Verifier().Expect(10, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -400,11 +406,11 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { cli_ctx.AddMetadata(meta2.first, meta2.second); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); - server_ok(2); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); @@ -414,10 +420,10 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -441,19 +447,19 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); - server_ok(2); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second); response_writer.SendInitialMetadata(tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second); @@ -461,10 +467,10 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(5)); - server_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(6)); - client_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -488,24 +494,24 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); - server_ok(2); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); response_writer.Finish(send_response, Status::OK, tag(4)); - server_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(5)); - client_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -548,11 +554,11 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { cli_ctx.AddMetadata(meta2.first, meta2.second); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); - server_ok(2); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); @@ -562,9 +568,9 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second); response_writer.SendInitialMetadata(tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second); EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second); @@ -575,10 +581,10 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); response_writer.Finish(send_response, Status::OK, tag(5)); - server_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(6)); - client_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); diff --git a/test/cpp/end2end/client_crash_test.cc b/test/cpp/end2end/client_crash_test.cc index aeba295947..645226f375 100644 --- a/test/cpp/end2end/client_crash_test.cc +++ b/test/cpp/end2end/client_crash_test.cc @@ -90,15 +90,13 @@ class CrashTest : public ::testing::Test { void KillServer() { server_.reset(); - // give some time for the TCP connection to drop - gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(1))); } private: std::unique_ptr<SubProcess> server_; }; -TEST_F(CrashTest, KillAfterWrite) { +TEST_F(CrashTest, KillBeforeWrite) { auto stub = CreateServerAndStub(); EchoRequest request; @@ -112,17 +110,18 @@ TEST_F(CrashTest, KillAfterWrite) { EXPECT_TRUE(stream->Read(&response)); EXPECT_EQ(response.message(), request.message()); - request.set_message("I'm going to kill you"); - EXPECT_TRUE(stream->Write(request)); - KillServer(); + request.set_message("You should be dead"); + // This may succeed or fail depending on the state of the TCP connection + stream->Write(request); + // But the read will definitely fail EXPECT_FALSE(stream->Read(&response)); EXPECT_FALSE(stream->Finish().ok()); } -TEST_F(CrashTest, KillBeforeWrite) { +TEST_F(CrashTest, KillAfterWrite) { auto stub = CreateServerAndStub(); EchoRequest request; @@ -136,11 +135,13 @@ TEST_F(CrashTest, KillBeforeWrite) { EXPECT_TRUE(stream->Read(&response)); EXPECT_EQ(response.message(), request.message()); + request.set_message("I'm going to kill you"); + EXPECT_TRUE(stream->Write(request)); + KillServer(); - request.set_message("You should be dead"); - EXPECT_FALSE(stream->Write(request)); - EXPECT_FALSE(stream->Read(&response)); + // This may succeed or fail depending on how quick the server was + stream->Read(&response); EXPECT_FALSE(stream->Finish().ok()); } @@ -161,5 +162,11 @@ int main(int argc, char** argv) { grpc_test_init(argc, argv); ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); + // Order seems to matter on these tests: run three times to eliminate that + for (int i = 0; i < 3; i++) { + if (RUN_ALL_TESTS() != 0) { + return 1; + } + } + return 0; } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 45ba8b0878..5e850ea30a 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -68,6 +68,8 @@ namespace testing { namespace { +const char* kServerCancelAfterReads = "cancel_after_reads"; + // When echo_deadline is requested, deadline seen in the ServerContext is set in // the response in seconds. void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, @@ -131,7 +133,23 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { EchoResponse* response) GRPC_OVERRIDE { EchoRequest request; response->set_message(""); + int cancel_after_reads = 0; + const std::multimap<grpc::string, grpc::string> client_initial_metadata = + context->client_metadata(); + if (client_initial_metadata.find(kServerCancelAfterReads) != + client_initial_metadata.end()) { + std::istringstream iss( + client_initial_metadata.find(kServerCancelAfterReads)->second); + iss >> cancel_after_reads; + gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); + } while (reader->Read(&request)) { + if (cancel_after_reads == 1) { + gpr_log(GPR_INFO, "return cancel status"); + return Status::CANCELLED; + } else if (cancel_after_reads > 0) { + cancel_after_reads--; + } response->mutable_message()->append(request.message()); } return Status::OK; @@ -687,6 +705,27 @@ TEST_F(End2endTest, OverridePerCallCredentials) { EXPECT_TRUE(s.ok()); } +// Client sends 20 requests and the server returns CANCELLED status after +// reading 10 requests. +TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.AddMetadata(kServerCancelAfterReads, "10"); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + int send_messages = 20; + while (send_messages > 0) { + EXPECT_TRUE(stream->Write(request)); + send_messages--; + } + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(s.error_code(), StatusCode::CANCELLED); +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 7132b6b1f1..b9d47b32de 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -33,10 +33,10 @@ #include <memory> -#include "src/cpp/proto/proto_utils.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/util/echo.grpc.pb.h" +#include <grpc++/impl/proto_utils.h> #include <grpc++/async_generic_service.h> #include <grpc++/async_unary_call.h> #include <grpc++/byte_buffer.h> diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1b7a8d26b2..e1e44f9ac0 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -168,7 +168,7 @@ class AsyncClient : public Client { if (!closed_loop_) { rpc_deadlines_.emplace_back(); next_channel_.push_back(i % channel_count_); - issue_allowed_.push_back(true); + issue_allowed_.emplace_back(true); grpc_time next_issue; NextIssueTime(i, &next_issue); @@ -199,6 +199,15 @@ class AsyncClient : public Client { delete ClientRpcContext::detag(got_tag); } } + // Now clear out all the pre-allocated idle contexts + for (int ch = 0; ch < channel_count_; ch++) { + while (!contexts_[ch].empty()) { + // Get an idle context from the front of the list + auto* ctx = *(contexts_[ch].begin()); + contexts_[ch].pop_front(); + delete ctx; + } + } } bool ThreadFunc(Histogram* histogram, @@ -234,12 +243,6 @@ class AsyncClient : public Client { GPR_ASSERT(false); break; } - if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) && - grpc_time_source::now() > deadline) { - // we have missed some 1-second deadline, which is worth noting - gpr_log(GPR_INFO, "Missed an RPC deadline"); - // Don't give up, as there might be some truly heavy tails - } if (got_event) { ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); if (ctx->RunNextState(ok, histogram) == false) { @@ -313,11 +316,20 @@ class AsyncClient : public Client { } private: + class boolean { // exists only to avoid data-race on vector<bool> + public: + boolean(): val_(false) {} + boolean(bool b): val_(b) {} + operator bool() const {return val_;} + boolean& operator=(bool b) {val_=b; return *this;} + private: + bool val_; + }; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<deadline_list> rpc_deadlines_; // per thread deadlines std::vector<int> next_channel_; // per thread round-robin channel ctr - std::vector<bool> issue_allowed_; // may this thread attempt to issue + std::vector<boolean> issue_allowed_; // may this thread attempt to issue std::vector<grpc_time> next_issue_; // when should it issue? std::vector<std::mutex> channel_lock_; diff --git a/test/cpp/qps/perf_db.proto b/test/cpp/qps/perf_db.proto new file mode 100644 index 0000000000..60e038406a --- /dev/null +++ b/test/cpp/qps/perf_db.proto @@ -0,0 +1,71 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +import "test/cpp/qps/qpstest.proto"; + +package grpc.testing; + +service PerfDbTransfer { + // Sends client info + rpc RecordSingleClientData(SingleUserRecordRequest) + returns (SingleUserRecordReply) { + } +} + +// Metrics to be stored +message Metrics { + double qps = 1; + double qps_per_core = 2; + double perc_lat_50 = 3; + double perc_lat_90 = 4; + double perc_lat_95 = 5; + double perc_lat_99 = 6; + double perc_lat_99_point_9 = 7; + double server_system_time = 8; + double server_user_time = 9; + double client_system_time = 10; + double client_user_time = 11; +} + +// Request for storing a single user's data +message SingleUserRecordRequest { + string hashed_id = 1; + string test_name = 2; + string sys_info = 3; + string tag = 4; + Metrics metrics = 5; + ClientConfig client_config = 6; + ServerConfig server_config = 7; +} + +// Reply to request for storing single user's data +message SingleUserRecordReply { +} diff --git a/test/cpp/qps/perf_db_client.cc b/test/cpp/qps/perf_db_client.cc new file mode 100644 index 0000000000..08d20f0b8d --- /dev/null +++ b/test/cpp/qps/perf_db_client.cc @@ -0,0 +1,143 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/perf_db_client.h" + +namespace grpc { +namespace testing { + +// sets the client and server config information +void PerfDbClient::setConfigs(const ClientConfig& client_config, + const ServerConfig& server_config) { + client_config_ = client_config; + server_config_ = server_config; +} + +// sets the QPS +void PerfDbClient::setQps(double qps) { + qps_ = qps; +} + +// sets the QPS per core +void PerfDbClient::setQpsPerCore(double qps_per_core) { + qps_per_core_ = qps_per_core; +} + +// sets the 50th, 90th, 95th, 99th and 99.9th percentile latency +void PerfDbClient::setLatencies(double perc_lat_50, + double perc_lat_90, + double perc_lat_95, + double perc_lat_99, + double perc_lat_99_point_9) { + perc_lat_50_ = perc_lat_50; + perc_lat_90_ = perc_lat_90; + perc_lat_95_ = perc_lat_95; + perc_lat_99_ = perc_lat_99; + perc_lat_99_point_9_ = perc_lat_99_point_9; +} + +// sets the server and client, user and system times +void PerfDbClient::setTimes(double server_system_time, double server_user_time, + double client_system_time, double client_user_time) { + server_system_time_ = server_system_time; + server_user_time_ = server_user_time; + client_system_time_ = client_system_time; + client_user_time_ = client_user_time; +} + +// sends the data to the performance database server +bool PerfDbClient::sendData(std::string hashed_id, std::string test_name, + std::string sys_info, std::string tag) { + // Data record request object + SingleUserRecordRequest single_user_record_request; + + // setting access token, name of the test and the system information + single_user_record_request.set_hashed_id(hashed_id); + single_user_record_request.set_test_name(test_name); + single_user_record_request.set_sys_info(sys_info); + single_user_record_request.set_tag(tag); + + // setting configs + *(single_user_record_request.mutable_client_config()) = client_config_; + *(single_user_record_request.mutable_server_config()) = server_config_; + + Metrics* metrics = single_user_record_request.mutable_metrics(); + + // setting metrcs in data record request + if (qps_ != DBL_MIN) { + metrics->set_qps(qps_); + } + if (qps_per_core_ != DBL_MIN) { + metrics->set_qps_per_core(qps_per_core_); + } + if (perc_lat_50_ != DBL_MIN) { + metrics->set_perc_lat_50(perc_lat_50_); + } + if (perc_lat_90_ != DBL_MIN) { + metrics->set_perc_lat_90(perc_lat_90_); + } + if (perc_lat_95_ != DBL_MIN) { + metrics->set_perc_lat_95(perc_lat_95_); + } + if (perc_lat_99_ != DBL_MIN) { + metrics->set_perc_lat_99(perc_lat_99_); + } + if (perc_lat_99_point_9_ != DBL_MIN) { + metrics->set_perc_lat_99_point_9(perc_lat_99_point_9_); + } + if (server_system_time_ != DBL_MIN) { + metrics->set_server_system_time(server_system_time_); + } + if (server_user_time_ != DBL_MIN) { + metrics->set_server_user_time(server_user_time_); + } + if (client_system_time_ != DBL_MIN) { + metrics->set_client_system_time(client_system_time_); + } + if (client_user_time_ != DBL_MIN) { + metrics->set_client_user_time(client_user_time_); + } + + SingleUserRecordReply single_user_record_reply; + ClientContext context; + + Status status = stub_->RecordSingleClientData( + &context, single_user_record_request, &single_user_record_reply); + if (status.ok()) { + return true; // data sent to database successfully + } else { + return false; // error in data sending + } +} +} // testing +} // grpc diff --git a/test/cpp/qps/perf_db_client.h b/test/cpp/qps/perf_db_client.h new file mode 100644 index 0000000000..ce7a88bbff --- /dev/null +++ b/test/cpp/qps/perf_db_client.h @@ -0,0 +1,115 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <iostream> +#include <memory> +#include <string> +#include <cfloat> + +#include <grpc/grpc.h> +#include <grpc++/channel_arguments.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/credentials.h> +#include <grpc++/status.h> +#include "test/cpp/qps/perf_db.grpc.pb.h" + +namespace grpc { +namespace testing { + +// Manages data sending to performance database server +class PerfDbClient { + public: + PerfDbClient() { + qps_ = DBL_MIN; + qps_per_core_ = DBL_MIN; + perc_lat_50_ = DBL_MIN; + perc_lat_90_ = DBL_MIN; + perc_lat_95_ = DBL_MIN; + perc_lat_99_ = DBL_MIN; + perc_lat_99_point_9_ = DBL_MIN; + server_system_time_ = DBL_MIN; + server_user_time_ = DBL_MIN; + client_system_time_ = DBL_MIN; + client_user_time_ = DBL_MIN; + } + + void init(std::shared_ptr<ChannelInterface> channel) { + stub_ = PerfDbTransfer::NewStub(channel); + } + + ~PerfDbClient() {} + + // sets the client and server config information + void setConfigs(const ClientConfig& client_config, + const ServerConfig& server_config); + + // sets the qps + void setQps(double qps); + + // sets the qps per core + void setQpsPerCore(double qps_per_core); + + // sets the 50th, 90th, 95th, 99th and 99.9th percentile latency + void setLatencies(double perc_lat_50, double perc_lat_90, + double perc_lat_95, double perc_lat_99, + double perc_lat_99_point_9); + + // sets the server and client, user and system times + void setTimes(double server_system_time, double server_user_time, + double client_system_time, double client_user_time); + + // sends the data to the performance database server + bool sendData(std::string hashed_id, std::string test_name, + std::string sys_info, std::string tag); + + private: + std::unique_ptr<PerfDbTransfer::Stub> stub_; + ClientConfig client_config_; + ServerConfig server_config_; + double qps_; + double qps_per_core_; + double perc_lat_50_; + double perc_lat_90_; + double perc_lat_95_; + double perc_lat_99_; + double perc_lat_99_point_9_; + double server_system_time_; + double server_user_time_; + double client_system_time_; + double client_user_time_; +}; + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/qps_test_openloop.cc b/test/cpp/qps/qps_test_openloop.cc index 52873b2987..96a9b4504c 100644 --- a/test/cpp/qps/qps_test_openloop.cc +++ b/test/cpp/qps/qps_test_openloop.cc @@ -60,7 +60,7 @@ static void RunQPS() { client_config.set_rpc_type(UNARY); client_config.set_load_type(POISSON); client_config.mutable_load_params()-> - mutable_poisson()->set_offered_load(10000.0); + mutable_poisson()->set_offered_load(1000.0); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); diff --git a/test/cpp/qps/qps_test_with_poll.cc b/test/cpp/qps/qps_test_with_poll.cc new file mode 100644 index 0000000000..90a8da8d11 --- /dev/null +++ b/test/cpp/qps/qps_test_with_poll.cc @@ -0,0 +1,90 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <set> + +#include <grpc/support/log.h> + +#include <signal.h> + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/util/benchmark_config.h" + +extern "C" { +#include "src/core/iomgr/pollset_posix.h" +} + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 5; + +static void RunQPS() { + gpr_log(GPR_INFO, "Running QPS test"); + + ClientConfig client_config; + client_config.set_client_type(ASYNC_CLIENT); + client_config.set_enable_ssl(false); + client_config.set_outstanding_rpcs_per_channel(1000); + client_config.set_client_channels(8); + client_config.set_payload_size(1); + client_config.set_async_client_threads(8); + client_config.set_rpc_type(UNARY); + + ServerConfig server_config; + server_config.set_server_type(ASYNC_SERVER); + server_config.set_enable_ssl(false); + server_config.set_threads(4); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); + + GetReporter()->ReportQPSPerCore(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitBenchmark(&argc, &argv, true); + + grpc_platform_become_multipoller = grpc_poll_become_multipoller; + + signal(SIGPIPE, SIG_IGN); + grpc::testing::RunQPS(); + + return 0; +} diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 423275ee85..f1cea5ee66 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -31,7 +31,7 @@ * */ -#include "qps_worker.h" +#include "test/cpp/qps/qps_worker.h" #include <cassert> #include <memory> diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index 678ea080d1..ff01ec1501 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -43,49 +43,47 @@ void CompositeReporter::add(std::unique_ptr<Reporter> reporter) { reporters_.emplace_back(std::move(reporter)); } -void CompositeReporter::ReportQPS(const ScenarioResult& result) const { +void CompositeReporter::ReportQPS(const ScenarioResult& result) { for (size_t i = 0; i < reporters_.size(); ++i) { reporters_[i]->ReportQPS(result); } } -void CompositeReporter::ReportQPSPerCore(const ScenarioResult& result) const { +void CompositeReporter::ReportQPSPerCore(const ScenarioResult& result) { for (size_t i = 0; i < reporters_.size(); ++i) { reporters_[i]->ReportQPSPerCore(result); } } -void CompositeReporter::ReportLatency(const ScenarioResult& result) const { +void CompositeReporter::ReportLatency(const ScenarioResult& result) { for (size_t i = 0; i < reporters_.size(); ++i) { reporters_[i]->ReportLatency(result); } } -void CompositeReporter::ReportTimes(const ScenarioResult& result) const { +void CompositeReporter::ReportTimes(const ScenarioResult& result) { for (size_t i = 0; i < reporters_.size(); ++i) { reporters_[i]->ReportTimes(result); } } - -void GprLogReporter::ReportQPS(const ScenarioResult& result) const { +void GprLogReporter::ReportQPS(const ScenarioResult& result) { gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / average(result.client_resources, [](ResourceUsage u) { return u.wall_time; })); } -void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) const { - auto qps = - result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; }); +void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { + auto qps = result.latencies.Count() / + average(result.client_resources, + [](ResourceUsage u) { return u.wall_time; }); gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, qps / result.server_config.threads()); } -void GprLogReporter::ReportLatency(const ScenarioResult& result) const { +void GprLogReporter::ReportLatency(const ScenarioResult& result) { gpr_log(GPR_INFO, "Latencies (50/90/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f/%.1f us", result.latencies.Percentile(50) / 1000, @@ -95,7 +93,7 @@ void GprLogReporter::ReportLatency(const ScenarioResult& result) const { result.latencies.Percentile(99.9) / 1000); } -void GprLogReporter::ReportTimes(const ScenarioResult& result) const { +void GprLogReporter::ReportTimes(const ScenarioResult& result) { gpr_log(GPR_INFO, "Server system time: %.2f%%", 100.0 * sum(result.server_resources, [](ResourceUsage u) { return u.system_time; }) / @@ -118,5 +116,71 @@ void GprLogReporter::ReportTimes(const ScenarioResult& result) const { [](ResourceUsage u) { return u.wall_time; })); } +void PerfDbReporter::ReportQPS(const ScenarioResult& result) { + auto qps = result.latencies.Count() / + average(result.client_resources, + [](ResourceUsage u) { return u.wall_time; }); + + perf_db_client_.setQps(qps); + perf_db_client_.setConfigs(result.client_config, result.server_config); +} + +void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { + auto qps = result.latencies.Count() / + average(result.client_resources, + [](ResourceUsage u) { return u.wall_time; }); + + auto qpsPerCore = qps / result.server_config.threads(); + + perf_db_client_.setQps(qps); + perf_db_client_.setQpsPerCore(qpsPerCore); + perf_db_client_.setConfigs(result.client_config, result.server_config); +} + +void PerfDbReporter::ReportLatency(const ScenarioResult& result) { + perf_db_client_.setLatencies(result.latencies.Percentile(50) / 1000, + result.latencies.Percentile(90) / 1000, + result.latencies.Percentile(95) / 1000, + result.latencies.Percentile(99) / 1000, + result.latencies.Percentile(99.9) / 1000); + perf_db_client_.setConfigs(result.client_config, result.server_config); +} + +void PerfDbReporter::ReportTimes(const ScenarioResult& result) { + double server_system_time = + 100.0 * sum(result.server_resources, + [](ResourceUsage u) { return u.system_time; }) / + sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); + double server_user_time = + 100.0 * sum(result.server_resources, + [](ResourceUsage u) { return u.user_time; }) / + sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); + double client_system_time = + 100.0 * sum(result.client_resources, + [](ResourceUsage u) { return u.system_time; }) / + sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); + double client_user_time = + 100.0 * sum(result.client_resources, + [](ResourceUsage u) { return u.user_time; }) / + sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); + + perf_db_client_.setTimes(server_system_time, server_user_time, client_system_time, + client_user_time); + perf_db_client_.setConfigs(result.client_config, result.server_config); +} + +void PerfDbReporter::SendData() { + // send data to performance database + bool data_state = + perf_db_client_.sendData(hashed_id_, test_name_, sys_info_, tag_); + + // check state of data sending + if (data_state) { + gpr_log(GPR_INFO, "Data sent to performance database successfully"); + } else { + gpr_log(GPR_INFO, "Data could not be sent to performance database"); + } +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h index 0cce08816a..aec3cbe80a 100644 --- a/test/cpp/qps/report.h +++ b/test/cpp/qps/report.h @@ -41,6 +41,7 @@ #include "test/cpp/qps/driver.h" #include "test/cpp/qps/qpstest.grpc.pb.h" +#include "test/cpp/qps/perf_db_client.h" namespace grpc { namespace testing { @@ -59,16 +60,16 @@ class Reporter { string name() const { return name_; } /** Reports QPS for the given \a result. */ - virtual void ReportQPS(const ScenarioResult& result) const = 0; + virtual void ReportQPS(const ScenarioResult& result) = 0; /** Reports QPS per core as (YYY/server core). */ - virtual void ReportQPSPerCore(const ScenarioResult& result) const = 0; + virtual void ReportQPSPerCore(const ScenarioResult& result) = 0; /** Reports latencies for the 50, 90, 95, 99 and 99.9 percentiles, in ms. */ - virtual void ReportLatency(const ScenarioResult& result) const = 0; + virtual void ReportLatency(const ScenarioResult& result) = 0; /** Reports system and user time for client and server systems. */ - virtual void ReportTimes(const ScenarioResult& result) const = 0; + virtual void ReportTimes(const ScenarioResult& result) = 0; private: const string name_; @@ -82,10 +83,10 @@ class CompositeReporter : public Reporter { /** Adds a \a reporter to the composite. */ void add(std::unique_ptr<Reporter> reporter); - void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE; - void ReportQPSPerCore(const ScenarioResult& result) const GRPC_OVERRIDE; - void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE; - void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE; + void ReportQPS(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportQPSPerCore(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportLatency(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportTimes(const ScenarioResult& result) GRPC_OVERRIDE; private: std::vector<std::unique_ptr<Reporter> > reporters_; @@ -97,10 +98,39 @@ class GprLogReporter : public Reporter { GprLogReporter(const string& name) : Reporter(name) {} private: - void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE; - void ReportQPSPerCore(const ScenarioResult& result) const GRPC_OVERRIDE; - void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE; - void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE; + void ReportQPS(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportQPSPerCore(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportLatency(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportTimes(const ScenarioResult& result) GRPC_OVERRIDE; +}; + +/** Reporter for performance database tool */ +class PerfDbReporter : public Reporter { + public: + PerfDbReporter(const string& name, const string& hashed_id, + const string& test_name, const string& sys_info, + const string& server_address, const string& tag) + : Reporter(name), + hashed_id_(hashed_id), + test_name_(test_name), + sys_info_(sys_info), + tag_(tag) { + perf_db_client_.init(grpc::CreateChannel( + server_address, grpc::InsecureCredentials(), ChannelArguments())); + } + ~PerfDbReporter() GRPC_OVERRIDE { SendData(); }; + + private: + PerfDbClient perf_db_client_; + std::string hashed_id_; + std::string test_name_; + std::string sys_info_; + std::string tag_; + void ReportQPS(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportQPSPerCore(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportLatency(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportTimes(const ScenarioResult& result) GRPC_OVERRIDE; + void SendData(); }; } // namespace testing diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 210aef4fd6..f5251e961b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -64,7 +64,7 @@ namespace testing { class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) { + AsyncQpsServerTest(const ServerConfig &config, int port) { char *server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -97,6 +97,9 @@ class AsyncQpsServerTest : public Server { } } for (int i = 0; i < config.threads(); i++) { + shutdown_state_.emplace_back(new PerThreadShutdownState()); + } + for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; @@ -105,11 +108,9 @@ class AsyncQpsServerTest : public Server { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke bool still_going = ctx->RunNextState(ok); - std::unique_lock<std::mutex> g(shutdown_mutex_); - if (!shutdown_) { + if (!shutdown_state_[i]->shutdown()) { // this RPC context is done, so refresh it if (!still_going) { - g.unlock(); ctx->Reset(); } } else { @@ -122,9 +123,8 @@ class AsyncQpsServerTest : public Server { } ~AsyncQpsServerTest() { server_->Shutdown(); - { - std::lock_guard<std::mutex> g(shutdown_mutex_); - shutdown_ = true; + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + (*ss)->set_shutdown(); } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); @@ -316,8 +316,25 @@ class AsyncQpsServerTest : public Server { TestService::AsyncService async_service_; std::forward_list<ServerRpcContext *> contexts_; - std::mutex shutdown_mutex_; - bool shutdown_; + class PerThreadShutdownState { + public: + PerThreadShutdownState() : shutdown_(false) {} + + bool shutdown() const { + std::lock_guard<std::mutex> lock(mutex_); + return shutdown_; + } + + void set_shutdown() { + std::lock_guard<std::mutex> lock(mutex_); + shutdown_ = true; + } + + private: + mutable std::mutex mutex_; + bool shutdown_; + }; + std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index dfc102fc17..14a8b0b089 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -40,7 +40,7 @@ #include <grpc/support/time.h> #include <gflags/gflags.h> -#include "qps_worker.h" +#include "test/cpp/qps/qps_worker.h" #include "test/cpp/util/test_config.h" DEFINE_int32(driver_port, 0, "Driver server port."); diff --git a/test/cpp/util/benchmark_config.cc b/test/cpp/util/benchmark_config.cc index 5b3c1daf5d..91fbbf9677 100644 --- a/test/cpp/util/benchmark_config.cc +++ b/test/cpp/util/benchmark_config.cc @@ -37,6 +37,18 @@ DEFINE_bool(enable_log_reporter, true, "Enable reporting of benchmark results through GprLog"); +DEFINE_bool(report_metrics_db, false, "True if metrics to be reported to performance database"); + +DEFINE_string(hashed_id, "", "Hash of the user id"); + +DEFINE_string(test_name, "", "Name of the test being executed"); + +DEFINE_string(sys_info, "", "System information"); + +DEFINE_string(server_address, "localhost:50052", "Address of the performance database server"); + +DEFINE_string(tag, "", "Optional tag for the test"); + // In some distros, gflags is in the namespace google, and in some others, // in gflags. This hack is enabling us to find both. namespace google {} @@ -57,6 +69,12 @@ static std::shared_ptr<Reporter> InitBenchmarkReporters() { composite_reporter->add( std::unique_ptr<Reporter>(new GprLogReporter("LogReporter"))); } + if(FLAGS_report_metrics_db) { + composite_reporter->add( + std::unique_ptr<Reporter>(new PerfDbReporter("PerfDbReporter", FLAGS_hashed_id, FLAGS_test_name, + FLAGS_sys_info, FLAGS_server_address, FLAGS_tag))); + } + return std::shared_ptr<Reporter>(composite_reporter); } diff --git a/test/cpp/util/byte_buffer_test.cc b/test/cpp/util/byte_buffer_test.cc new file mode 100644 index 0000000000..13eb49730a --- /dev/null +++ b/test/cpp/util/byte_buffer_test.cc @@ -0,0 +1,115 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/byte_buffer.h> + +#include <cstring> +#include <vector> + +#include <grpc/support/slice.h> +#include <grpc++/slice.h> +#include <gtest/gtest.h> + +namespace grpc { +namespace { + +const char* kContent1 = "hello xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; +const char* kContent2 = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy world"; + +class ByteBufferTest : public ::testing::Test { +}; + +TEST_F(ByteBufferTest, CreateFromSingleSlice) { + gpr_slice hello = gpr_slice_from_copied_string(kContent1); + Slice s(hello, Slice::STEAL_REF); + ByteBuffer buffer(&s, 1); +} + +TEST_F(ByteBufferTest, CreateFromVector) { + gpr_slice hello = gpr_slice_from_copied_string(kContent1); + gpr_slice world = gpr_slice_from_copied_string(kContent2); + std::vector<Slice> slices; + slices.push_back(Slice(hello, Slice::STEAL_REF)); + slices.push_back(Slice(world, Slice::STEAL_REF)); + ByteBuffer buffer(&slices[0], 2); +} + +TEST_F(ByteBufferTest, Clear) { + gpr_slice hello = gpr_slice_from_copied_string(kContent1); + Slice s(hello, Slice::STEAL_REF); + ByteBuffer buffer(&s, 1); + buffer.Clear(); +} + +TEST_F(ByteBufferTest, Length) { + gpr_slice hello = gpr_slice_from_copied_string(kContent1); + gpr_slice world = gpr_slice_from_copied_string(kContent2); + std::vector<Slice> slices; + slices.push_back(Slice(hello, Slice::STEAL_REF)); + slices.push_back(Slice(world, Slice::STEAL_REF)); + ByteBuffer buffer(&slices[0], 2); + EXPECT_EQ(strlen(kContent1) + strlen(kContent2), buffer.Length()); +} + +bool SliceEqual(const Slice& a, gpr_slice b) { + if (a.size() != GPR_SLICE_LENGTH(b)) { + return false; + } + for (size_t i = 0; i < a.size(); i++) { + if (a.begin()[i] != GPR_SLICE_START_PTR(b)[i]) { + return false; + } + } + return true; +} + +TEST_F(ByteBufferTest, Dump) { + gpr_slice hello = gpr_slice_from_copied_string(kContent1); + gpr_slice world = gpr_slice_from_copied_string(kContent2); + std::vector<Slice> slices; + slices.push_back(Slice(hello, Slice::STEAL_REF)); + slices.push_back(Slice(world, Slice::STEAL_REF)); + ByteBuffer buffer(&slices[0], 2); + slices.clear(); + buffer.Dump(&slices); + EXPECT_TRUE(SliceEqual(slices[0], hello)); + EXPECT_TRUE(SliceEqual(slices[1], world)); +} + +} // namespace +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index 32d61b0307..3c3baeb769 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -88,7 +88,7 @@ void ParseMetadataFlag( return; } std::vector<grpc::string> fields; - grpc::string delim(":"); + const char* delim = ":"; size_t cur, next = -1; do { cur = next + 1; diff --git a/test/cpp/util/slice_test.cc b/test/cpp/util/slice_test.cc new file mode 100644 index 0000000000..eb328490e1 --- /dev/null +++ b/test/cpp/util/slice_test.cc @@ -0,0 +1,77 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/slice.h> + +#include <grpc/support/slice.h> +#include <gtest/gtest.h> + +namespace grpc { +namespace { + +const char* kContent = "hello xxxxxxxxxxxxxxxxxxxx world"; + +class SliceTest : public ::testing::Test { + protected: + void CheckSlice(const Slice& s, const grpc::string& content) { + EXPECT_EQ(content.size(), s.size()); + EXPECT_EQ(content, + grpc::string(reinterpret_cast<const char*>(s.begin()), s.size())); + } +}; + +TEST_F(SliceTest, Steal) { + gpr_slice s = gpr_slice_from_copied_string(kContent); + Slice spp(s, Slice::STEAL_REF); + CheckSlice(spp, kContent); +} + +TEST_F(SliceTest, Add) { + gpr_slice s = gpr_slice_from_copied_string(kContent); + Slice spp(s, Slice::ADD_REF); + gpr_slice_unref(s); + CheckSlice(spp, kContent); +} + +TEST_F(SliceTest, Empty) { + Slice empty_slice; + CheckSlice(empty_slice, ""); +} + +} // namespace +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |