aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_async.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r--test/cpp/qps/server_async.cc65
1 files changed, 29 insertions, 36 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index a68f1ae7b6..dea8746331 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -37,7 +37,6 @@
#include <mutex>
#include <thread>
-#include <gflags/gflags.h>
#include <grpc++/generic/async_generic_service.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
@@ -48,7 +47,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
-#include <gtest/gtest.h>
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/core/util/test_config.h"
@@ -73,7 +71,8 @@ class AsyncQpsServerTest : public Server {
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_function,
std::function<grpc::Status(const PayloadConfig &, const RequestType *,
- ResponseType *)> process_rpc)
+ ResponseType *)>
+ process_rpc)
: Server(config) {
char *server_address = NULL;
@@ -103,7 +102,7 @@ class AsyncQpsServerTest : public Server {
auto process_rpc_bound =
std::bind(process_rpc, config.payload_config(), _1, _2);
- for (int i = 0; i < 10000 / num_threads; i++) {
+ for (int i = 0; i < 15000; i++) {
for (int j = 0; j < num_threads; j++) {
if (request_unary_function) {
auto request_unary =
@@ -124,21 +123,24 @@ class AsyncQpsServerTest : public Server {
for (int i = 0; i < num_threads; i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
- }
- for (int i = 0; i < num_threads; i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
~AsyncQpsServerTest() {
- server_->Shutdown();
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
- (*ss)->set_shutdown();
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
+ }
+ // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
+ auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
+ server_->Shutdown(deadline);
+ for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
+ (*cq)->Shutdown();
}
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
- (*cq)->Shutdown();
bool ok;
void *got_tag;
while ((*cq)->Next(&got_tag, &ok))
@@ -151,22 +153,24 @@ class AsyncQpsServerTest : public Server {
}
private:
- void ThreadFunc(int rank) {
+ void ThreadFunc(int thread_idx) {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
+ while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
- const bool still_going = ctx->RunNextState(ok);
- if (!shutdown_state_[rank]->shutdown()) {
- // this RPC context is done, so refresh it
- if (!still_going) {
- ctx->Reset();
- }
- } else {
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
return;
}
+ const bool still_going = ctx->RunNextState(ok);
+ // if this RPC context is done, refresh it
+ if (!still_going) {
+ ctx->Reset();
+ }
}
return;
}
@@ -190,7 +194,8 @@ class AsyncQpsServerTest : public Server {
ServerRpcContextUnaryImpl(
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncResponseWriter<ResponseType> *,
- void *)> request_method,
+ void *)>
+ request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
@@ -333,24 +338,12 @@ class AsyncQpsServerTest : public Server {
ServiceType async_service_;
std::forward_list<ServerRpcContext *> contexts_;
- class PerThreadShutdownState {
- public:
- PerThreadShutdownState() : shutdown_(false) {}
-
- bool shutdown() const {
- std::lock_guard<std::mutex> lock(mutex_);
- return shutdown_;
- }
-
- void set_shutdown() {
- std::lock_guard<std::mutex> lock(mutex_);
- shutdown_ = true;
- }
-
- private:
- mutable std::mutex mutex_;
- bool shutdown_;
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
};
+
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};