aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc232
-rw-r--r--test/cpp/end2end/end2end_test.cc34
-rw-r--r--test/cpp/end2end/test_service_impl.cc15
-rw-r--r--test/cpp/qps/client.h35
-rw-r--r--test/cpp/qps/driver.cc32
-rw-r--r--test/cpp/util/test_credentials_provider.h5
6 files changed, 238 insertions, 115 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 9ca3bf98f8..dc8c2bb6e5 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -68,6 +68,7 @@ namespace testing {
namespace {
void* tag(int i) { return (void*)(intptr_t)i; }
+int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
#ifdef GPR_POSIX_SOCKET
static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
@@ -106,37 +107,50 @@ class PollingOverrider {
class Verifier {
public:
explicit Verifier(bool spin) : spin_(spin) {}
+ // Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) {
expectations_[tag(i)] = expect_ok;
return *this;
}
+ // Next waits for 1 async tag to complete, checks its
+ // expectations, and returns the tag
+ int Next(CompletionQueue* cq, bool ignore_ok) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ for (;;) {
+ auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+ if (r == CompletionQueue::TIMEOUT) continue;
+ if (r == CompletionQueue::GOT_EVENT) break;
+ gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+ abort();
+ }
+ } else {
+ EXPECT_TRUE(cq->Next(&got_tag, &ok));
+ }
+ auto it = expectations_.find(got_tag);
+ EXPECT_TRUE(it != expectations_.end());
+ if (!ignore_ok) {
+ EXPECT_EQ(it->second, ok);
+ }
+ expectations_.erase(it);
+ return detag(got_tag);
+ }
+
+ // Verify keeps calling Next until all currently set
+ // expected tags are complete
void Verify(CompletionQueue* cq) { Verify(cq, false); }
+ // This version of Verify allows optionally ignoring the
+ // outcome of the expectation
void Verify(CompletionQueue* cq, bool ignore_ok) {
GPR_ASSERT(!expectations_.empty());
while (!expectations_.empty()) {
- bool ok;
- void* got_tag;
- if (spin_) {
- for (;;) {
- auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
- if (r == CompletionQueue::TIMEOUT) continue;
- if (r == CompletionQueue::GOT_EVENT) break;
- gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
- abort();
- }
- } else {
- EXPECT_TRUE(cq->Next(&got_tag, &ok));
- }
- auto it = expectations_.find(got_tag);
- EXPECT_TRUE(it != expectations_.end());
- if (!ignore_ok) {
- EXPECT_EQ(it->second, ok);
- }
- expectations_.erase(it);
+ Next(cq, ignore_ok);
}
}
+ // This version of Verify stops after a certain deadline
void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline) {
if (expectations_.empty()) {
@@ -793,7 +807,8 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
}
// This class is for testing scenarios where RPCs are cancelled on the server
-// by calling ServerContext::TryCancel()
+// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
+// API to check for cancellation
class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
protected:
typedef enum {
@@ -803,13 +818,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
CANCEL_AFTER_PROCESSING
} ServerTryCancelRequestPhase;
- void ServerTryCancel(ServerContext* context) {
- EXPECT_FALSE(context->IsCancelled());
- context->TryCancel();
- gpr_log(GPR_INFO, "Server called TryCancel()");
- EXPECT_TRUE(context->IsCancelled());
- }
-
// Helper for testing client-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios:
@@ -843,6 +851,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
+ srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -858,9 +867,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_server_cq_result = true;
bool ignore_cq_result = false;
+ bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- ServerTryCancel(&srv_ctx);
+ srv_ctx.TryCancel();
+ Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know
// for sure that all cq results will return false from this point forward
@@ -868,22 +880,39 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
std::thread* server_try_cancel_thd = NULL;
+
+ auto verif = Verifier(GetParam());
+
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd = new std::thread(
- &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+ server_try_cancel_thd =
+ new std::thread(&ServerContext::TryCancel, &srv_ctx);
// Server will cancel the RPC in a parallel thread while reading the
// requests from the client. Since the cancellation can happen at anytime,
// some of the cq results (i.e those until cancellation) might be true but
// its non deterministic. So better to ignore the cq results
ignore_cq_result = true;
+ // Expect that we might possibly see the done tag that
+ // indicates cancellation completion in this case
+ want_done_tag = true;
+ verif.Expect(11, true);
}
// Server reads 3 messages (tags 6, 7 and 8)
+ // But if want_done_tag is true, we might also see tag 11
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
srv_stream.Read(&recv_request, tag(tag_idx));
- Verifier(GetParam())
- .Expect(tag_idx, expected_server_cq_result)
- .Verify(cq_.get(), ignore_cq_result);
+ // Note that we'll add something to the verifier and verify that
+ // something was seen, but it might be tag 11 and not what we
+ // just added
+ int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
+ .Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
+ if (got_tag == 11) {
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
+ // Now get the other entry that we were waiting on
+ EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
+ }
}
if (server_try_cancel_thd != NULL) {
@@ -892,7 +921,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- ServerTryCancel(&srv_ctx);
+ srv_ctx.TryCancel();
+ want_done_tag = true;
+ verif.Expect(11, true);
+ }
+
+ if (want_done_tag) {
+ verif.Verify(cq_.get());
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
}
// The RPC has been cancelled at this point for sure (i.e irrespective of
@@ -945,6 +982,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
+ srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -952,9 +990,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_cq_result = true;
bool ignore_cq_result = false;
+ bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- ServerTryCancel(&srv_ctx);
+ srv_ctx.TryCancel();
+ Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
@@ -962,24 +1003,41 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
std::thread* server_try_cancel_thd = NULL;
+
+ auto verif = Verifier(GetParam());
+
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd = new std::thread(
- &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+ server_try_cancel_thd =
+ new std::thread(&ServerContext::TryCancel, &srv_ctx);
// Server will cancel the RPC in a parallel thread while writing responses
// to the client. Since the cancellation can happen at anytime, some of
// the cq results (i.e those until cancellation) might be true but it is
// non deterministic. So better to ignore the cq results
ignore_cq_result = true;
+ // Expect that we might possibly see the done tag that
+ // indicates cancellation completion in this case
+ want_done_tag = true;
+ verif.Expect(11, true);
}
// Server sends three messages (tags 3, 4 and 5)
+ // But if want_done tag is true, we might also see tag 11
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_response.set_message("Pong " + std::to_string(tag_idx));
srv_stream.Write(send_response, tag(tag_idx));
- Verifier(GetParam())
- .Expect(tag_idx, expected_cq_result)
- .Verify(cq_.get(), ignore_cq_result);
+ // Note that we'll add something to the verifier and verify that
+ // something was seen, but it might be tag 11 and not what we
+ // just added
+ int got_tag = verif.Expect(tag_idx, expected_cq_result)
+ .Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
+ if (got_tag == 11) {
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
+ // Now get the other entry that we were waiting on
+ EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
+ }
}
if (server_try_cancel_thd != NULL) {
@@ -988,13 +1046,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- ServerTryCancel(&srv_ctx);
+ srv_ctx.TryCancel();
+ want_done_tag = true;
+ verif.Expect(11, true);
// Client reads may fail bacause it is notified that the stream is
// cancelled.
ignore_cq_result = true;
}
+ if (want_done_tag) {
+ verif.Verify(cq_.get());
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
+ }
+
// Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx));
@@ -1052,6 +1118,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
+ srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -1063,9 +1130,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_cq_result = true;
bool ignore_cq_result = false;
+ bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- ServerTryCancel(&srv_ctx);
+ srv_ctx.TryCancel();
+ Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
@@ -1073,42 +1143,84 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
std::thread* server_try_cancel_thd = NULL;
+
+ auto verif = Verifier(GetParam());
+
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd = new std::thread(
- &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+ server_try_cancel_thd =
+ new std::thread(&ServerContext::TryCancel, &srv_ctx);
// Since server is going to cancel the RPC in a parallel thread, some of
// the cq results (i.e those until the cancellation) might be true. Since
// that number is non-deterministic, it is better to ignore the cq results
ignore_cq_result = true;
+ // Expect that we might possibly see the done tag that
+ // indicates cancellation completion in this case
+ want_done_tag = true;
+ verif.Expect(11, true);
}
+ int got_tag;
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam())
- .Expect(4, expected_cq_result)
- .Verify(cq_.get(), ignore_cq_result);
+ verif.Expect(4, expected_cq_result);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
+ if (got_tag == 11) {
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
+ // Now get the other entry that we were waiting on
+ EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
+ }
send_response.set_message("Pong");
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam())
- .Expect(5, expected_cq_result)
- .Verify(cq_.get(), ignore_cq_result);
+ verif.Expect(5, expected_cq_result);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
+ if (got_tag == 11) {
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
+ // Now get the other entry that we were waiting on
+ EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
+ }
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam())
- .Expect(6, expected_cq_result)
- .Verify(cq_.get(), ignore_cq_result);
+ verif.Expect(6, expected_cq_result);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
+ if (got_tag == 11) {
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
+ // Now get the other entry that we were waiting on
+ EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
+ }
// This is expected to succeed in all cases
cli_stream->WritesDone(tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
+ verif.Expect(7, true);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
+ if (got_tag == 11) {
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
+ // Now get the other entry that we were waiting on
+ EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7);
+ }
// This is expected to fail in all cases i.e for all values of
// server_try_cancel. This is because at this point, either there are no
// more msgs from the client (because client called WritesDone) or the RPC
// is cancelled on the server
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ verif.Expect(8, false);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
+ if (got_tag == 11) {
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
+ // Now get the other entry that we were waiting on
+ EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
+ }
if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join();
@@ -1116,7 +1228,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- ServerTryCancel(&srv_ctx);
+ srv_ctx.TryCancel();
+ want_done_tag = true;
+ verif.Expect(11, true);
+ }
+
+ if (want_done_tag) {
+ verif.Verify(cq_.get());
+ EXPECT_TRUE(srv_ctx.IsCancelled());
+ want_done_tag = false;
}
// The RPC has been cancelled at this point for sure (i.e irrespective of
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index dc2c4f6426..4759818322 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -59,6 +59,7 @@
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
+using grpc::testing::kTlsCredentialsType;
using std::chrono::system_clock;
namespace grpc {
@@ -1194,6 +1195,8 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
request.mutable_param()->set_echo_metadata(true);
request.mutable_param()->set_expected_client_identity(
TestAuthMetadataProcessor::kGoodGuy);
+ request.mutable_param()->set_expected_transport_security_type(
+ GetParam().credentials_type);
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(request.message(), response.message());
@@ -1301,6 +1304,8 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
request.mutable_param()->set_echo_metadata(true);
request.mutable_param()->set_expected_client_identity(
TestAuthMetadataProcessor::kGoodGuy);
+ request.mutable_param()->set_expected_transport_security_type(
+ GetParam().credentials_type);
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(request.message(), response.message());
@@ -1349,25 +1354,30 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
- request.mutable_param()->set_check_auth_context(true);
-
+ request.mutable_param()->set_check_auth_context(GetParam().credentials_type ==
+ kTlsCredentialsType);
+ request.mutable_param()->set_expected_transport_security_type(
+ GetParam().credentials_type);
ClientContext context;
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.ok());
std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
- std::vector<grpc::string_ref> ssl =
+ std::vector<grpc::string_ref> tst =
auth_ctx->FindPropertyValues("transport_security_type");
- EXPECT_EQ(1u, ssl.size());
- EXPECT_EQ("ssl", ToString(ssl[0]));
- EXPECT_EQ("x509_subject_alternative_name",
- auth_ctx->GetPeerIdentityPropertyName());
- EXPECT_EQ(3u, auth_ctx->GetPeerIdentity().size());
- EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
- EXPECT_EQ("waterzooi.test.google.be",
- ToString(auth_ctx->GetPeerIdentity()[1]));
- EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
+ EXPECT_EQ(1u, tst.size());
+ EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
+ if (GetParam().credentials_type == kTlsCredentialsType) {
+ EXPECT_EQ("x509_subject_alternative_name",
+ auth_ctx->GetPeerIdentityPropertyName());
+ EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
+ EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
+ EXPECT_EQ("waterzooi.test.google.be",
+ ToString(auth_ctx->GetPeerIdentity()[1]));
+ EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
+ EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
+ }
}
std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 7c3e514eff..fe29c4afe9 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -62,14 +62,16 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
}
}
-void CheckServerAuthContext(const ServerContext* context,
- const grpc::string& expected_client_identity) {
+void CheckServerAuthContext(
+ const ServerContext* context,
+ const grpc::string& expected_transport_security_type,
+ const grpc::string& expected_client_identity) {
std::shared_ptr<const AuthContext> auth_ctx = context->auth_context();
- std::vector<grpc::string_ref> ssl =
+ std::vector<grpc::string_ref> tst =
auth_ctx->FindPropertyValues("transport_security_type");
- EXPECT_EQ(1u, ssl.size());
- EXPECT_EQ("ssl", ToString(ssl[0]));
- if (expected_client_identity.length() == 0) {
+ EXPECT_EQ(1u, tst.size());
+ EXPECT_EQ(expected_transport_security_type, ToString(tst[0]));
+ if (expected_client_identity.empty()) {
EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty());
EXPECT_FALSE(auth_ctx->IsPeerAuthenticated());
@@ -139,6 +141,7 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
(request->param().expected_client_identity().length() > 0 ||
request->param().check_auth_context())) {
CheckServerAuthContext(context,
+ request->param().expected_transport_security_type(),
request->param().expected_client_identity());
}
if (request->has_param() && request->param().response_message_length() > 0) {
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 2dc83f0f29..92e77eed9b 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -123,15 +123,13 @@ class Client {
if (reset) {
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->BeginSwap(&to_merge[i]);
- }
- std::unique_ptr<UsageTimer> timer(new UsageTimer);
- timer_.swap(timer);
- for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->EndSwap();
+ threads_[i]->Swap(&to_merge[i]);
latencies.Merge(to_merge[i]);
}
delete[] to_merge;
+
+ std::unique_ptr<UsageTimer> timer(new UsageTimer);
+ timer_.swap(timer);
timer_result = timer->Mark();
} else {
// merge snapshots of each thread histogram
@@ -227,7 +225,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
- new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@@ -240,16 +237,9 @@ class Client {
impl_.join();
}
- void BeginSwap(Histogram* n) {
+ void Swap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
- new_stats_ = n;
- }
-
- void EndSwap() {
- std::unique_lock<std::mutex> g(mu_);
- while (new_stats_ != nullptr) {
- cv_.wait(g);
- };
+ n->Swap(&histogram_);
}
void MergeStatsInto(Histogram* hist) {
@@ -263,10 +253,11 @@ class Client {
void ThreadFunc() {
for (;;) {
+ // lock since the thread should only be doing one thing at a time
+ std::lock_guard<std::mutex> g(mu_);
// run the loop body
const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
- // lock, see if we're done
- std::lock_guard<std::mutex> g(mu_);
+ // see if we're done
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
@@ -274,19 +265,11 @@ class Client {
if (done_) {
return;
}
- // check if we're resetting stats, swap out the histogram if so
- if (new_stats_) {
- new_stats_->Swap(&histogram_);
- new_stats_ = nullptr;
- cv_.notify_one();
- }
}
}
std::mutex mu_;
- std::condition_variable cv_;
bool done_;
- Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 1c7fdf8796..bc8780f74d 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -348,19 +348,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
std::unique_ptr<ScenarioResult> result(new ScenarioResult);
result->client_config = result_client_config;
result->server_config = result_server_config;
- gpr_log(GPR_INFO, "Finishing");
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
- }
+ gpr_log(GPR_INFO, "Finishing clients");
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
- }
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
- const auto& stats = server_status.stats();
- result->server_resources.emplace_back(
- stats.time_elapsed(), stats.time_user(), stats.time_system(),
- server_status.cores());
+ GPR_ASSERT(client->stream->WritesDone());
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
@@ -368,17 +359,30 @@ std::unique_ptr<ScenarioResult> RunScenario(
result->latencies.MergeProto(stats.latencies());
result->client_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system(), -1);
+ GPR_ASSERT(!client->stream->Read(&client_status));
}
-
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->WritesDone());
GPR_ASSERT(client->stream->Finish().ok());
}
+ delete[] clients;
+
+ gpr_log(GPR_INFO, "Finishing servers");
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->Write(server_mark));
GPR_ASSERT(server->stream->WritesDone());
+ }
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->Read(&server_status));
+ const auto& stats = server_status.stats();
+ result->server_resources.emplace_back(
+ stats.time_elapsed(), stats.time_user(), stats.time_system(),
+ server_status.cores());
+ GPR_ASSERT(!server->stream->Read(&server_status));
+ }
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Finish().ok());
}
- delete[] clients;
+
delete[] servers;
return result;
}
diff --git a/test/cpp/util/test_credentials_provider.h b/test/cpp/util/test_credentials_provider.h
index 50fadb53a2..1fb311e556 100644
--- a/test/cpp/util/test_credentials_provider.h
+++ b/test/cpp/util/test_credentials_provider.h
@@ -44,7 +44,10 @@ namespace grpc {
namespace testing {
const char kInsecureCredentialsType[] = "INSECURE_CREDENTIALS";
-const char kTlsCredentialsType[] = "TLS_CREDENTIALS";
+
+// For real credentials, like tls/ssl, this name should match the AuthContext
+// property "transport_security_type".
+const char kTlsCredentialsType[] = "ssl";
// Provide test credentials of a particular type.
class CredentialTypeProvider {