diff options
author | Vijay Pai <vpai@google.com> | 2015-03-23 12:44:28 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-03-23 12:44:28 -0700 |
commit | 8ad32091e6e23768a1fe2d466638ef4ab59a9e9e (patch) | |
tree | 7d4e511bce88dc646f9f2cca050408cb135fec4b /test | |
parent | 55bb5bdf803ffcb6e643d35611af4938ee29fd1c (diff) |
Make sure that nothing gets added to cq after shutdown
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/qps/server_async.cc | 21 |
1 files changed, 16 insertions, 5 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4312f597b2..5a27fff09a 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -33,6 +33,7 @@ #include <forward_list> #include <functional> +#include <mutex> #include <sys/time.h> #include <sys/resource.h> #include <sys/signal.h> @@ -64,7 +65,8 @@ namespace testing { class AsyncQpsServerTest : public Server { public: AsyncQpsServerTest(const ServerConfig &config, int port) - : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { + : srv_cq_(), async_service_(&srv_cq_), server_(nullptr), + shutdown_(false) { char *server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -100,7 +102,9 @@ class AsyncQpsServerTest : public Server { // The tag is a pointer to an RPC context to invoke if (ctx->RunNextState(ok) == false) { // this RPC context is done, so refresh it - ctx->Reset(); + std::lock_guard<std::mutex> g(shutdown_mutex_); + if (!shutdown_) + ctx->Reset(); } } return; @@ -109,7 +113,11 @@ class AsyncQpsServerTest : public Server { } ~AsyncQpsServerTest() { server_->Shutdown(); - srv_cq_.Shutdown(); + { + std::lock_guard<std::mutex> g(shutdown_mutex_); + shutdown_ = true; + srv_cq_.Shutdown(); + } for (auto &thr : threads_) { thr.join(); } @@ -195,7 +203,7 @@ class AsyncQpsServerTest : public Server { class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextStreamingImpl( - std::function<void(ServerContext *, + std::function<void(ServerContext *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> @@ -228,7 +236,7 @@ class AsyncQpsServerTest : public Server { next_state_ = &ServerRpcContextStreamingImpl::read_done; return true; } - + bool read_done(bool ok) { if (ok) { // invoke the method @@ -291,6 +299,9 @@ class AsyncQpsServerTest : public Server { SimpleResponse,SimpleRequest> *, void *)> request_streaming_; std::forward_list<ServerRpcContext *> contexts_; + + std::mutex shutdown_mutex_; + bool shutdown_; }; std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, |