diff options
author | Craig Tiller <ctiller@google.com> | 2017-06-19 11:21:02 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-06-19 11:21:02 -0700 |
commit | e7183b77a69ba6f28ed1e9aad2639a7bcaaa66a2 (patch) | |
tree | 69656ab72706a1348b64296a1efce14369ae74c5 /test/cpp/qps | |
parent | 6a23105aa93454b3fdddff092dfe26c96eda0d6e (diff) | |
parent | 070a8eeb281a2659501a60b1bbc86798fcb652c4 (diff) |
Merge github.com:grpc/grpc into cq-drop
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client_async.cc | 47 | ||||
-rw-r--r-- | test/cpp/qps/qps_json_driver.cc | 10 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 18 |
3 files changed, 65 insertions, 10 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 03e116e26c..f7dda0f758 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -55,6 +55,11 @@ class ClientRpcContext { } virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0; + void lock() { mu_.lock(); } + void unlock() { mu_.unlock(); } + + private: + std::mutex mu_; }; template <class RequestType, class ResponseType> @@ -106,6 +111,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq); } @@ -163,8 +169,14 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { num_async_threads_(NumThreads(config)) { SetupLoadTest(config, num_async_threads_); - for (int i = 0; i < num_async_threads_; i++) { + int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified + int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator + for (int i = 0; i < num_cqs; i++) { cli_cqs_.emplace_back(new CompletionQueue); + } + + for (int i = 0; i < num_async_threads_; i++) { + cq_.emplace_back(i % cli_cqs_.size()); next_issuers_.emplace_back(NextIssuer(i)); shutdown_state_.emplace_back(new PerThreadShutdownState()); } @@ -231,20 +243,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { void* got_tag; bool ok; - if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { + if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); if (shutdown_state_[thread_idx]->shutdown) { + // We want to delete the context. However, it is possible that + // another thread that just initiated an action on this + // context still has its lock even though the action on the + // context has completed. To delay for that, just grab the + // lock for serialization. Take a new scope. + { std::lock_guard<ClientRpcContext> lctx(*ctx); } delete ctx; return true; - } else if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[thread_idx].get()); - // delete the old version + } + bool del = false; + + // Create a new scope for a lock_guard'ed region + { + std::lock_guard<ClientRpcContext> lctx(*ctx); + if (!ctx->RunNextState(ok, entry)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + // set the old version to delete + del = true; + } + } + if (del) { delete ctx; } return true; @@ -255,6 +283,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; + std::vector<int> cq_; std::vector<std::function<gpr_timespec()>> next_issuers_; std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; @@ -377,6 +406,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingPingPongImpl( stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq, messages_per_stream_); } @@ -515,6 +545,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingFromClientImpl( stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq); } @@ -632,6 +663,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingFromServerImpl( stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq); } @@ -774,6 +806,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextGenericStreamingImpl( stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq, messages_per_stream_); } diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index a946992100..590c22ec29 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -16,6 +16,7 @@ * */ +#include <fstream> #include <iostream> #include <memory> #include <set> @@ -57,6 +58,8 @@ DEFINE_string(qps_server_target_override, "", "Override QPS server target to configure in client configs." "Only applicable if there is a single benchmark server."); +DEFINE_string(json_file_out, "", "File to write the JSON output to."); + namespace grpc { namespace testing { @@ -88,6 +91,13 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario, *success = result->server_success(i); } + if (FLAGS_json_file_out != "") { + std::ofstream json_outfile; + json_outfile.open(FLAGS_json_file_out); + json_outfile << "{\"qps\": " << result->summary().qps() << "}\n"; + json_outfile.close(); + } + return result; } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 9cb281740f..96d7e5ef74 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -16,6 +16,7 @@ * */ +#include <algorithm> #include <forward_list> #include <functional> #include <memory> @@ -89,9 +90,14 @@ class AsyncQpsServerTest final : public grpc::testing::Server { gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads); } - for (int i = 0; i < num_threads; i++) { + int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified + int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator + for (int i = 0; i < num_cqs; i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } + for (int i = 0; i < num_threads; i++) { + cq_.emplace_back(i % srv_cqs_.size()); + } if (config.resource_quota_size() > 0) { builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest") @@ -105,7 +111,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::placeholders::_2); for (int i = 0; i < 5000; i++) { - for (int j = 0; j < num_threads; j++) { + for (int j = 0; j < num_cqs; j++) { if (request_unary_function) { auto request_unary = std::bind( request_unary_function, &async_service_, std::placeholders::_1, @@ -190,7 +196,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) { + while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that @@ -199,6 +205,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { if (shutdown_state_[thread_idx]->shutdown) { return; } + std::lock_guard<ServerRpcContext> l2(*ctx); const bool still_going = ctx->RunNextState(ok); // if this RPC context is done, refresh it if (!still_going) { @@ -211,9 +218,13 @@ class AsyncQpsServerTest final : public grpc::testing::Server { class ServerRpcContext { public: ServerRpcContext() {} + void lock() { mu_.lock(); } + void unlock() { mu_.unlock(); } virtual ~ServerRpcContext(){}; virtual bool RunNextState(bool) = 0; // next state, return false if done virtual void Reset() = 0; // start this back at a clean state + private: + std::mutex mu_; }; static void *tag(ServerRpcContext *func) { return reinterpret_cast<void *>(func); @@ -503,6 +514,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; + std::vector<int> cq_; ServiceType async_service_; std::vector<std::unique_ptr<ServerRpcContext>> contexts_; |