aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2017-10-25 10:28:02 -0700
committerGravatar GitHub <noreply@github.com>2017-10-25 10:28:02 -0700
commit1bf7207852b4138c8a30e5a2f8f2c4bfffbba262 (patch)
treea49ce2246b55aa230802c909898a20d136f3561d /test/cpp
parent1bda5106421c5e6f449e6dbdd2214cdf31b26fcf (diff)
parent42bd87e376913939850bfa78a3c7f96ce83af11e (diff)
Merge pull request #13084 from kpayson64/cq_lambda
CompletionQueue DoThenAsyncNext
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc111
-rw-r--r--test/cpp/qps/client.h44
-rw-r--r--test/cpp/qps/client_async.cc48
-rw-r--r--test/cpp/qps/client_sync.cc39
-rw-r--r--test/cpp/qps/server_async.cc31
5 files changed, 208 insertions, 65 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 2a33e8ae11..b7634d0438 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -99,7 +99,7 @@ class PollingOverrider {
class Verifier {
public:
- explicit Verifier(bool spin) : spin_(spin) {}
+ explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {}
// Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) {
return ExpectUnless(i, expect_ok, false);
@@ -142,6 +142,18 @@ class Verifier {
return detag(got_tag);
}
+ template <typename T>
+ CompletionQueue::NextStatus DoOnceThenAsyncNext(
+ CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
+ std::function<void(void)> lambda) {
+ if (lambda_run_) {
+ return cq->AsyncNext(got_tag, ok, deadline);
+ } else {
+ lambda_run_ = true;
+ return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
+ }
+ }
+
// Verify keeps calling Next until all currently set
// expected tags are complete
void Verify(CompletionQueue* cq) { Verify(cq, false); }
@@ -154,6 +166,7 @@ class Verifier {
Next(cq, ignore_ok);
}
}
+
// This version of Verify stops after a certain deadline
void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline) {
@@ -193,6 +206,47 @@ class Verifier {
}
}
+ // This version of Verify stops after a certain deadline, and uses the
+ // DoThenAsyncNext API
+ // to call the lambda
+ void Verify(CompletionQueue* cq,
+ std::chrono::system_clock::time_point deadline,
+ std::function<void(void)> lambda) {
+ if (expectations_.empty()) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ while (std::chrono::system_clock::now() < deadline) {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ while (!expectations_.empty()) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ for (;;) {
+ GPR_ASSERT(std::chrono::system_clock::now() < deadline);
+ auto r = DoOnceThenAsyncNext(
+ cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda);
+ if (r == CompletionQueue::TIMEOUT) continue;
+ if (r == CompletionQueue::GOT_EVENT) break;
+ gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+ abort();
+ }
+ } else {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::GOT_EVENT);
+ }
+ GotTag(got_tag, ok, false);
+ }
+ }
+ }
+
private:
void GotTag(void* got_tag, bool ok, bool ignore_ok) {
auto it = expectations_.find(got_tag);
@@ -226,6 +280,7 @@ class Verifier {
std::map<void*, bool> expectations_;
std::map<void*, MaybeExpect> maybe_expectations_;
bool spin_;
+ bool lambda_run_;
};
bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
@@ -490,6 +545,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
EXPECT_TRUE(recv_status.ok());
}
+// Test a simple RPC using the async version of Next
+TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+
+ std::chrono::system_clock::time_point time_now(
+ std::chrono::system_clock::now());
+ std::chrono::system_clock::time_point time_limit(
+ std::chrono::system_clock::now() + std::chrono::seconds(10));
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+
+ auto resp_writer_ptr = &response_writer;
+ auto lambda_2 = [&, this, resp_writer_ptr]() {
+ gpr_log(GPR_ERROR, "CALLED");
+ service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
+ cq_.get(), tag(2));
+ };
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Verify(cq_.get(), time_limit, lambda_2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ auto recv_resp_ptr = &recv_response;
+ auto status_ptr = &recv_status;
+ send_response.set_message(recv_request.message());
+ auto lambda_3 = [&, this, resp_writer_ptr, send_response]() {
+ resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
+ };
+ response_reader->Finish(recv_resp_ptr, status_ptr, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get(), std::chrono::system_clock::time_point::max(),
+ lambda_3);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+}
+
// Two pings and a final pong.
TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index a5049e5502..82c6361abd 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -230,8 +230,6 @@ class Client {
}
virtual void DestroyMultithreading() = 0;
- virtual void InitThreadFunc(size_t thread_idx) = 0;
- virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@@ -279,7 +277,6 @@ class Client {
: std::bind(&Client::NextIssueTime, this, thread_idx);
}
- private:
class Thread {
public:
Thread(Client* client, size_t idx)
@@ -299,6 +296,16 @@ class Client {
MergeStatusHistogram(statuses_, s);
}
+ void UpdateHistogram(HistogramEntry* entry) {
+ std::lock_guard<std::mutex> g(mu_);
+ if (entry->value_used()) {
+ histogram_.Add(entry->value());
+ }
+ if (entry->status_used()) {
+ statuses_[entry->status()]++;
+ }
+ }
+
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
@@ -314,29 +321,8 @@ class Client {
wait_loop++;
}
- client_->InitThreadFunc(idx_);
-
- for (;;) {
- // run the loop body
- HistogramEntry entry;
- const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
- // lock, update histogram if needed and see if we're done
- std::lock_guard<std::mutex> g(mu_);
- if (entry.value_used()) {
- histogram_.Add(entry.value());
- }
- if (entry.status_used()) {
- statuses_[entry.status()]++;
- }
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- }
- if (!thread_still_ok ||
- static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) {
- client_->CompleteThread();
- return;
- }
- }
+ client_->ThreadFunc(idx_, this);
+ client_->CompleteThread();
}
std::mutex mu_;
@@ -347,6 +333,12 @@ class Client {
std::thread impl_;
};
+ bool ThreadCompleted() {
+ return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
+ }
+
+ virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
+
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<UsageTimer> timer_;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 9ed4e0b355..b5c7208664 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -236,33 +236,47 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
this->EndThreads(); // this needed for resolution
}
- void InitThreadFunc(size_t thread_idx) override final {}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final {
+ void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
void* got_tag;
bool ok;
- if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ HistogramEntry entry;
+ HistogramEntry* entry_ptr = &entry;
+ if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ return;
+ }
+ ClientRpcContext* ctx;
+ std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
+ do {
+ t->UpdateHistogram(entry_ptr);
// Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ shutdown_mu->lock();
if (shutdown_state_[thread_idx]->shutdown) {
ctx->TryCancel();
delete ctx;
- return true;
- }
- if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
- delete ctx;
+ while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ ctx = ClientRpcContext::detag(got_tag);
+ ctx->TryCancel();
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ return;
}
- return true;
- } else {
- // queue is shutting down, so we must be done
- return true;
- }
+ } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, entry_ptr, shutdown_mu]() {
+ bool next_ok = ok;
+ if (!ctx->RunNextState(next_ok, entry_ptr)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 94554a46b2..9f20b148eb 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -62,6 +62,25 @@ class SynchronousClient
virtual ~SynchronousClient(){};
+ virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
+ virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
+
+ void ThreadFunc(size_t thread_idx, Thread* t) override {
+ InitThreadFuncImpl(thread_idx);
+ for (;;) {
+ // run the loop body
+ HistogramEntry entry;
+ const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
+ t->UpdateHistogram(&entry);
+ if (!thread_still_ok) {
+ gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
+ }
+ if (!thread_still_ok || ThreadCompleted()) {
+ return;
+ }
+ }
+ }
+
protected:
// WaitToIssue returns false if we realize that we need to break out
bool WaitToIssue(int thread_idx) {
@@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
~SynchronousUnaryClient() {}
- void InitThreadFunc(size_t thread_idx) override {}
+ void InitThreadFuncImpl(size_t thread_idx) override {}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
}
@@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
messages_issued_[thread_idx] = 0;
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
}
@@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
&responses_[thread_idx]);
last_issue_[thread_idx] = UsageTimer::Now();
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// Figure out how to make histogram sensible if this is rate-paced
if (!WaitToIssue(thread_idx)) {
return true;
@@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] =
stub->StreamingFromServer(&context_[thread_idx], request_);
last_recv_[thread_idx] = UsageTimer::Now();
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
double now = UsageTimer::Now();
@@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 776371a2c6..4576be5bb3 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -202,23 +202,32 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
- ServerRpcContext *ctx = detag(got_tag);
+ if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ return;
+ }
+ ServerRpcContext *ctx;
+ std::mutex *mu_ptr;
+ do {
+ ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ mu_ptr = &shutdown_state_[thread_idx]->mutex;
+ mu_ptr->lock();
if (shutdown_state_[thread_idx]->shutdown) {
+ mu_ptr->unlock();
return;
}
- std::lock_guard<ServerRpcContext> l2(*ctx);
- const bool still_going = ctx->RunNextState(ok);
- // if this RPC context is done, refresh it
- if (!still_going) {
- ctx->Reset();
- }
- }
- return;
+ } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, mu_ptr]() {
+ ctx->lock();
+ if (!ctx->RunNextState(ok)) {
+ ctx->Reset();
+ }
+ ctx->unlock();
+ mu_ptr->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
}
class ServerRpcContext {