aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-06-19 11:21:02 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-06-19 11:21:02 -0700
commite7183b77a69ba6f28ed1e9aad2639a7bcaaa66a2 (patch)
tree69656ab72706a1348b64296a1efce14369ae74c5 /test/cpp/qps
parent6a23105aa93454b3fdddff092dfe26c96eda0d6e (diff)
parent070a8eeb281a2659501a60b1bbc86798fcb652c4 (diff)
Merge github.com:grpc/grpc into cq-drop
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/client_async.cc47
-rw-r--r--test/cpp/qps/qps_json_driver.cc10
-rw-r--r--test/cpp/qps/server_async.cc18
3 files changed, 65 insertions, 10 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 03e116e26c..f7dda0f758 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -55,6 +55,11 @@ class ClientRpcContext {
}
virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
+ void lock() { mu_.lock(); }
+ void unlock() { mu_.unlock(); }
+
+ private:
+ std::mutex mu_;
};
template <class RequestType, class ResponseType>
@@ -106,6 +111,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -163,8 +169,14 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
num_async_threads_(NumThreads(config)) {
SetupLoadTest(config, num_async_threads_);
- for (int i = 0; i < num_async_threads_; i++) {
+ int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
+ int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator
+ for (int i = 0; i < num_cqs; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
+ }
+
+ for (int i = 0; i < num_async_threads_; i++) {
+ cq_.emplace_back(i % cli_cqs_.size());
next_issuers_.emplace_back(NextIssuer(i));
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
@@ -231,20 +243,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
- if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
// Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
+ // We want to delete the context. However, it is possible that
+ // another thread that just initiated an action on this
+ // context still has its lock even though the action on the
+ // context has completed. To delay for that, just grab the
+ // lock for serialization. Take a new scope.
+ { std::lock_guard<ClientRpcContext> lctx(*ctx); }
delete ctx;
return true;
- } else if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[thread_idx].get());
- // delete the old version
+ }
+ bool del = false;
+
+ // Create a new scope for a lock_guard'ed region
+ {
+ std::lock_guard<ClientRpcContext> lctx(*ctx);
+ if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ // set the old version to delete
+ del = true;
+ }
+ }
+ if (del) {
delete ctx;
}
return true;
@@ -255,6 +283,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
+ std::vector<int> cq_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
@@ -377,6 +406,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingPingPongImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_);
}
@@ -515,6 +545,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromClientImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -632,6 +663,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromServerImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -774,6 +806,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextGenericStreamingImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_);
}
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index a946992100..590c22ec29 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -16,6 +16,7 @@
*
*/
+#include <fstream>
#include <iostream>
#include <memory>
#include <set>
@@ -57,6 +58,8 @@ DEFINE_string(qps_server_target_override, "",
"Override QPS server target to configure in client configs."
"Only applicable if there is a single benchmark server.");
+DEFINE_string(json_file_out, "", "File to write the JSON output to.");
+
namespace grpc {
namespace testing {
@@ -88,6 +91,13 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
*success = result->server_success(i);
}
+ if (FLAGS_json_file_out != "") {
+ std::ofstream json_outfile;
+ json_outfile.open(FLAGS_json_file_out);
+ json_outfile << "{\"qps\": " << result->summary().qps() << "}\n";
+ json_outfile.close();
+ }
+
return result;
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 9cb281740f..96d7e5ef74 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -16,6 +16,7 @@
*
*/
+#include <algorithm>
#include <forward_list>
#include <functional>
#include <memory>
@@ -89,9 +90,14 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
}
- for (int i = 0; i < num_threads; i++) {
+ int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
+ int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
+ for (int i = 0; i < num_cqs; i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
+ for (int i = 0; i < num_threads; i++) {
+ cq_.emplace_back(i % srv_cqs_.size());
+ }
if (config.resource_quota_size() > 0) {
builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest")
@@ -105,7 +111,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::placeholders::_2);
for (int i = 0; i < 5000; i++) {
- for (int j = 0; j < num_threads; j++) {
+ for (int j = 0; j < num_cqs; j++) {
if (request_unary_function) {
auto request_unary = std::bind(
request_unary_function, &async_service_, std::placeholders::_1,
@@ -190,7 +196,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
@@ -199,6 +205,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
if (shutdown_state_[thread_idx]->shutdown) {
return;
}
+ std::lock_guard<ServerRpcContext> l2(*ctx);
const bool still_going = ctx->RunNextState(ok);
// if this RPC context is done, refresh it
if (!still_going) {
@@ -211,9 +218,13 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
class ServerRpcContext {
public:
ServerRpcContext() {}
+ void lock() { mu_.lock(); }
+ void unlock() { mu_.unlock(); }
virtual ~ServerRpcContext(){};
virtual bool RunNextState(bool) = 0; // next state, return false if done
virtual void Reset() = 0; // start this back at a clean state
+ private:
+ std::mutex mu_;
};
static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void *>(func);
@@ -503,6 +514,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
+ std::vector<int> cq_;
ServiceType async_service_;
std::vector<std::unique_ptr<ServerRpcContext>> contexts_;