diff options
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r-- | test/cpp/qps/server_async.cc | 65 |
1 files changed, 29 insertions, 36 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index a68f1ae7b6..dea8746331 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -37,7 +37,6 @@ #include <mutex> #include <thread> -#include <gflags/gflags.h> #include <grpc++/generic/async_generic_service.h> #include <grpc++/security/server_credentials.h> #include <grpc++/server.h> @@ -48,7 +47,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> -#include <gtest/gtest.h> #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/core/util/test_config.h" @@ -73,7 +71,8 @@ class AsyncQpsServerTest : public Server { CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, std::function<grpc::Status(const PayloadConfig &, const RequestType *, - ResponseType *)> process_rpc) + ResponseType *)> + process_rpc) : Server(config) { char *server_address = NULL; @@ -103,7 +102,7 @@ class AsyncQpsServerTest : public Server { auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), _1, _2); - for (int i = 0; i < 10000 / num_threads; i++) { + for (int i = 0; i < 15000; i++) { for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = @@ -124,21 +123,24 @@ class AsyncQpsServerTest : public Server { for (int i = 0; i < num_threads; i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); - } - for (int i = 0; i < num_threads; i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } ~AsyncQpsServerTest() { - server_->Shutdown(); for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - (*ss)->set_shutdown(); + std::lock_guard<std::mutex> lock((*ss)->mutex); + (*ss)->shutdown = true; + } + // TODO (vpai): Remove this deadline and allow Shutdown to finish properly + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3); + server_->Shutdown(deadline); + for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { + (*cq)->Shutdown(); } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { - (*cq)->Shutdown(); bool ok; void *got_tag; while ((*cq)->Next(&got_tag, &ok)) @@ -151,22 +153,24 @@ class AsyncQpsServerTest : public Server { } private: - void ThreadFunc(int rank) { + void ThreadFunc(int thread_idx) { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[rank]->Next(&got_tag, &ok)) { + while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke - const bool still_going = ctx->RunNextState(ok); - if (!shutdown_state_[rank]->shutdown()) { - // this RPC context is done, so refresh it - if (!still_going) { - ctx->Reset(); - } - } else { + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { return; } + const bool still_going = ctx->RunNextState(ok); + // if this RPC context is done, refresh it + if (!still_going) { + ctx->Reset(); + } } return; } @@ -190,7 +194,8 @@ class AsyncQpsServerTest : public Server { ServerRpcContextUnaryImpl( std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, - void *)> request_method, + void *)> + request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), @@ -333,24 +338,12 @@ class AsyncQpsServerTest : public Server { ServiceType async_service_; std::forward_list<ServerRpcContext *> contexts_; - class PerThreadShutdownState { - public: - PerThreadShutdownState() : shutdown_(false) {} - - bool shutdown() const { - std::lock_guard<std::mutex> lock(mutex_); - return shutdown_; - } - - void set_shutdown() { - std::lock_guard<std::mutex> lock(mutex_); - shutdown_ = true; - } - - private: - mutable std::mutex mutex_; - bool shutdown_; + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} }; + std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; |