aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_async.cc
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-03-23 12:44:28 -0700
committerGravatar Vijay Pai <vpai@google.com>2015-03-23 12:44:28 -0700
commit8ad32091e6e23768a1fe2d466638ef4ab59a9e9e (patch)
tree7d4e511bce88dc646f9f2cca050408cb135fec4b /test/cpp/qps/server_async.cc
parent55bb5bdf803ffcb6e643d35611af4938ee29fd1c (diff)
Make sure that nothing gets added to cq after shutdown
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r--test/cpp/qps/server_async.cc21
1 files changed, 16 insertions, 5 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4312f597b2..5a27fff09a 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -33,6 +33,7 @@
#include <forward_list>
#include <functional>
+#include <mutex>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/signal.h>
@@ -64,7 +65,8 @@ namespace testing {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest(const ServerConfig &config, int port)
- : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
+ : srv_cq_(), async_service_(&srv_cq_), server_(nullptr),
+ shutdown_(false) {
char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
@@ -100,7 +102,9 @@ class AsyncQpsServerTest : public Server {
// The tag is a pointer to an RPC context to invoke
if (ctx->RunNextState(ok) == false) {
// this RPC context is done, so refresh it
- ctx->Reset();
+ std::lock_guard<std::mutex> g(shutdown_mutex_);
+ if (!shutdown_)
+ ctx->Reset();
}
}
return;
@@ -109,7 +113,11 @@ class AsyncQpsServerTest : public Server {
}
~AsyncQpsServerTest() {
server_->Shutdown();
- srv_cq_.Shutdown();
+ {
+ std::lock_guard<std::mutex> g(shutdown_mutex_);
+ shutdown_ = true;
+ srv_cq_.Shutdown();
+ }
for (auto &thr : threads_) {
thr.join();
}
@@ -195,7 +203,7 @@ class AsyncQpsServerTest : public Server {
class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext {
public:
ServerRpcContextStreamingImpl(
- std::function<void(ServerContext *,
+ std::function<void(ServerContext *,
grpc::ServerAsyncReaderWriter<ResponseType,
RequestType> *, void *)> request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)>
@@ -228,7 +236,7 @@ class AsyncQpsServerTest : public Server {
next_state_ = &ServerRpcContextStreamingImpl::read_done;
return true;
}
-
+
bool read_done(bool ok) {
if (ok) {
// invoke the method
@@ -291,6 +299,9 @@ class AsyncQpsServerTest : public Server {
SimpleResponse,SimpleRequest> *, void *)>
request_streaming_;
std::forward_list<ServerRpcContext *> contexts_;
+
+ std::mutex shutdown_mutex_;
+ bool shutdown_;
};
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,