diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 232 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 34 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 15 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 35 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 32 | ||||
-rw-r--r-- | test/cpp/util/test_credentials_provider.h | 5 |
6 files changed, 238 insertions, 115 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 9ca3bf98f8..dc8c2bb6e5 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -68,6 +68,7 @@ namespace testing { namespace { void* tag(int i) { return (void*)(intptr_t)i; } +int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); } #ifdef GPR_POSIX_SOCKET static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, @@ -106,37 +107,50 @@ class PollingOverrider { class Verifier { public: explicit Verifier(bool spin) : spin_(spin) {} + // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { expectations_[tag(i)] = expect_ok; return *this; } + // Next waits for 1 async tag to complete, checks its + // expectations, and returns the tag + int Next(CompletionQueue* cq, bool ignore_ok) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + } + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + if (!ignore_ok) { + EXPECT_EQ(it->second, ok); + } + expectations_.erase(it); + return detag(got_tag); + } + + // Verify keeps calling Next until all currently set + // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } + // This version of Verify allows optionally ignoring the + // outcome of the expectation void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { - bool ok; - void* got_tag; - if (spin_) { - for (;;) { - auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); - if (r == CompletionQueue::TIMEOUT) continue; - if (r == CompletionQueue::GOT_EVENT) break; - gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); - abort(); - } - } else { - EXPECT_TRUE(cq->Next(&got_tag, &ok)); - } - auto it = expectations_.find(got_tag); - EXPECT_TRUE(it != expectations_.end()); - if (!ignore_ok) { - EXPECT_EQ(it->second, ok); - } - expectations_.erase(it); + Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { if (expectations_.empty()) { @@ -793,7 +807,8 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { } // This class is for testing scenarios where RPCs are cancelled on the server -// by calling ServerContext::TryCancel() +// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone +// API to check for cancellation class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { protected: typedef enum { @@ -803,13 +818,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { CANCEL_AFTER_PROCESSING } ServerTryCancelRequestPhase; - void ServerTryCancel(ServerContext* context) { - EXPECT_FALSE(context->IsCancelled()); - context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel()"); - EXPECT_TRUE(context->IsCancelled()); - } - // Helper for testing client-streaming RPCs which are cancelled on the server. // Depending on the value of server_try_cancel parameter, this will test one // of the following three scenarios: @@ -843,6 +851,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -858,9 +867,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_server_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // Since cancellation is done before server reads any results, we know // for sure that all cq results will return false from this point forward @@ -868,22 +880,39 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while reading the // requests from the client. Since the cancellation can happen at anytime, // some of the cq results (i.e those until cancellation) might be true but // its non deterministic. So better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } // Server reads 3 messages (tags 6, 7 and 8) + // But if want_done_tag is true, we might also see tag 11 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { srv_stream.Read(&recv_request, tag(tag_idx)); - Verifier(GetParam()) - .Expect(tag_idx, expected_server_cq_result) - .Verify(cq_.get(), ignore_cq_result); + // Note that we'll add something to the verifier and verify that + // something was seen, but it might be tag 11 and not what we + // just added + int got_tag = verif.Expect(tag_idx, expected_server_cq_result) + .Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); + } } if (server_try_cancel_thd != NULL) { @@ -892,7 +921,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); + } + + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; } // The RPC has been cancelled at this point for sure (i.e irrespective of @@ -945,6 +982,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -952,9 +990,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -962,24 +1003,41 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while writing responses // to the client. Since the cancellation can happen at anytime, some of // the cq results (i.e those until cancellation) might be true but it is // non deterministic. So better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } // Server sends three messages (tags 3, 4 and 5) + // But if want_done tag is true, we might also see tag 11 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { send_response.set_message("Pong " + std::to_string(tag_idx)); srv_stream.Write(send_response, tag(tag_idx)); - Verifier(GetParam()) - .Expect(tag_idx, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + // Note that we'll add something to the verifier and verify that + // something was seen, but it might be tag 11 and not what we + // just added + int got_tag = verif.Expect(tag_idx, expected_cq_result) + .Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); + } } if (server_try_cancel_thd != NULL) { @@ -988,13 +1046,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); // Client reads may fail bacause it is notified that the stream is // cancelled. ignore_cq_result = true; } + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + } + // Client attemts to read the three messages from the server for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { cli_stream->Read(&recv_response, tag(tag_idx)); @@ -1052,6 +1118,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -1063,9 +1130,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -1073,42 +1143,84 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Since server is going to cancel the RPC in a parallel thread, some of // the cq results (i.e those until the cancellation) might be true. Since // that number is non-deterministic, it is better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } + int got_tag; srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam()) - .Expect(4, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(4, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4); + } send_response.set_message("Pong"); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam()) - .Expect(5, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(5, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5); + } cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam()) - .Expect(6, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(6, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6); + } // This is expected to succeed in all cases cli_stream->WritesDone(tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); + verif.Expect(7, true); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7); + } // This is expected to fail in all cases i.e for all values of // server_try_cancel. This is because at this point, either there are no // more msgs from the client (because client called WritesDone) or the RPC // is cancelled on the server srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + verif.Expect(8, false); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8); + } if (server_try_cancel_thd != NULL) { server_try_cancel_thd->join(); @@ -1116,7 +1228,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); + } + + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; } // The RPC has been cancelled at this point for sure (i.e irrespective of diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index dc2c4f6426..4759818322 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -59,6 +59,7 @@ using grpc::testing::EchoRequest; using grpc::testing::EchoResponse; +using grpc::testing::kTlsCredentialsType; using std::chrono::system_clock; namespace grpc { @@ -1194,6 +1195,8 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { request.mutable_param()->set_echo_metadata(true); request.mutable_param()->set_expected_client_identity( TestAuthMetadataProcessor::kGoodGuy); + request.mutable_param()->set_expected_transport_security_type( + GetParam().credentials_type); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(request.message(), response.message()); @@ -1301,6 +1304,8 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { request.mutable_param()->set_echo_metadata(true); request.mutable_param()->set_expected_client_identity( TestAuthMetadataProcessor::kGoodGuy); + request.mutable_param()->set_expected_transport_security_type( + GetParam().credentials_type); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(request.message(), response.message()); @@ -1349,25 +1354,30 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) { EchoRequest request; EchoResponse response; request.set_message("Hello"); - request.mutable_param()->set_check_auth_context(true); - + request.mutable_param()->set_check_auth_context(GetParam().credentials_type == + kTlsCredentialsType); + request.mutable_param()->set_expected_transport_security_type( + GetParam().credentials_type); ClientContext context; Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.ok()); std::shared_ptr<const AuthContext> auth_ctx = context.auth_context(); - std::vector<grpc::string_ref> ssl = + std::vector<grpc::string_ref> tst = auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, ssl.size()); - EXPECT_EQ("ssl", ToString(ssl[0])); - EXPECT_EQ("x509_subject_alternative_name", - auth_ctx->GetPeerIdentityPropertyName()); - EXPECT_EQ(3u, auth_ctx->GetPeerIdentity().size()); - EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0])); - EXPECT_EQ("waterzooi.test.google.be", - ToString(auth_ctx->GetPeerIdentity()[1])); - EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2])); + EXPECT_EQ(1u, tst.size()); + EXPECT_EQ(GetParam().credentials_type, ToString(tst[0])); + if (GetParam().credentials_type == kTlsCredentialsType) { + EXPECT_EQ("x509_subject_alternative_name", + auth_ctx->GetPeerIdentityPropertyName()); + EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size()); + EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0])); + EXPECT_EQ("waterzooi.test.google.be", + ToString(auth_ctx->GetPeerIdentity()[1])); + EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2])); + EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3])); + } } std::vector<TestScenario> CreateTestScenarios(bool use_proxy, diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 7c3e514eff..fe29c4afe9 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -62,14 +62,16 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, } } -void CheckServerAuthContext(const ServerContext* context, - const grpc::string& expected_client_identity) { +void CheckServerAuthContext( + const ServerContext* context, + const grpc::string& expected_transport_security_type, + const grpc::string& expected_client_identity) { std::shared_ptr<const AuthContext> auth_ctx = context->auth_context(); - std::vector<grpc::string_ref> ssl = + std::vector<grpc::string_ref> tst = auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, ssl.size()); - EXPECT_EQ("ssl", ToString(ssl[0])); - if (expected_client_identity.length() == 0) { + EXPECT_EQ(1u, tst.size()); + EXPECT_EQ(expected_transport_security_type, ToString(tst[0])); + if (expected_client_identity.empty()) { EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); @@ -139,6 +141,7 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, (request->param().expected_client_identity().length() > 0 || request->param().check_auth_context())) { CheckServerAuthContext(context, + request->param().expected_transport_security_type(), request->param().expected_client_identity()); } if (request->has_param() && request->param().response_message_length() > 0) { diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2dc83f0f29..92e77eed9b 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -123,15 +123,13 @@ class Client { if (reset) { Histogram* to_merge = new Histogram[threads_.size()]; for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->BeginSwap(&to_merge[i]); - } - std::unique_ptr<UsageTimer> timer(new UsageTimer); - timer_.swap(timer); - for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->EndSwap(); + threads_[i]->Swap(&to_merge[i]); latencies.Merge(to_merge[i]); } delete[] to_merge; + + std::unique_ptr<UsageTimer> timer(new UsageTimer); + timer_.swap(timer); timer_result = timer->Mark(); } else { // merge snapshots of each thread histogram @@ -227,7 +225,6 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), - new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -240,16 +237,9 @@ class Client { impl_.join(); } - void BeginSwap(Histogram* n) { + void Swap(Histogram* n) { std::lock_guard<std::mutex> g(mu_); - new_stats_ = n; - } - - void EndSwap() { - std::unique_lock<std::mutex> g(mu_); - while (new_stats_ != nullptr) { - cv_.wait(g); - }; + n->Swap(&histogram_); } void MergeStatsInto(Histogram* hist) { @@ -263,10 +253,11 @@ class Client { void ThreadFunc() { for (;;) { + // lock since the thread should only be doing one thing at a time + std::lock_guard<std::mutex> g(mu_); // run the loop body const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // lock, see if we're done - std::lock_guard<std::mutex> g(mu_); + // see if we're done if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; @@ -274,19 +265,11 @@ class Client { if (done_) { return; } - // check if we're resetting stats, swap out the histogram if so - if (new_stats_) { - new_stats_->Swap(&histogram_); - new_stats_ = nullptr; - cv_.notify_one(); - } } } std::mutex mu_; - std::condition_variable cv_; bool done_; - Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 1c7fdf8796..bc8780f74d 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -348,19 +348,10 @@ std::unique_ptr<ScenarioResult> RunScenario( std::unique_ptr<ScenarioResult> result(new ScenarioResult); result->client_config = result_client_config; result->server_config = result_server_config; - gpr_log(GPR_INFO, "Finishing"); - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Write(server_mark)); - } + gpr_log(GPR_INFO, "Finishing clients"); for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Write(client_mark)); - } - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Read(&server_status)); - const auto& stats = server_status.stats(); - result->server_resources.emplace_back( - stats.time_elapsed(), stats.time_user(), stats.time_system(), - server_status.cores()); + GPR_ASSERT(client->stream->WritesDone()); } for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); @@ -368,17 +359,30 @@ std::unique_ptr<ScenarioResult> RunScenario( result->latencies.MergeProto(stats.latencies()); result->client_resources.emplace_back( stats.time_elapsed(), stats.time_user(), stats.time_system(), -1); + GPR_ASSERT(!client->stream->Read(&client_status)); } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->WritesDone()); GPR_ASSERT(client->stream->Finish().ok()); } + delete[] clients; + + gpr_log(GPR_INFO, "Finishing servers"); for (auto server = &servers[0]; server != &servers[num_servers]; server++) { + GPR_ASSERT(server->stream->Write(server_mark)); GPR_ASSERT(server->stream->WritesDone()); + } + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { + GPR_ASSERT(server->stream->Read(&server_status)); + const auto& stats = server_status.stats(); + result->server_resources.emplace_back( + stats.time_elapsed(), stats.time_user(), stats.time_system(), + server_status.cores()); + GPR_ASSERT(!server->stream->Read(&server_status)); + } + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Finish().ok()); } - delete[] clients; + delete[] servers; return result; } diff --git a/test/cpp/util/test_credentials_provider.h b/test/cpp/util/test_credentials_provider.h index 50fadb53a2..1fb311e556 100644 --- a/test/cpp/util/test_credentials_provider.h +++ b/test/cpp/util/test_credentials_provider.h @@ -44,7 +44,10 @@ namespace grpc { namespace testing { const char kInsecureCredentialsType[] = "INSECURE_CREDENTIALS"; -const char kTlsCredentialsType[] = "TLS_CREDENTIALS"; + +// For real credentials, like tls/ssl, this name should match the AuthContext +// property "transport_security_type". +const char kTlsCredentialsType[] = "ssl"; // Provide test credentials of a particular type. class CredentialTypeProvider { |