aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client.h
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r--test/cpp/qps/client.h207
1 files changed, 118 insertions, 89 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 668d941916..ceb5cdd710 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -236,58 +236,7 @@ class Client {
return 0;
}
- protected:
- bool closed_loop_;
- gpr_atm thread_pool_done_;
- double median_latency_collection_interval_seconds_; // In seconds
-
- void StartThreads(size_t num_threads) {
- gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
- threads_remaining_ = num_threads;
- for (size_t i = 0; i < num_threads; i++) {
- threads_.emplace_back(new Thread(this, i));
- }
- }
-
- void EndThreads() {
- MaybeStartRequests();
- threads_.clear();
- }
-
- virtual void DestroyMultithreading() = 0;
-
- void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
- // Set up the load distribution based on the number of threads
- const auto& load = config.load_params();
-
- std::unique_ptr<RandomDistInterface> random_dist;
- switch (load.load_case()) {
- case LoadParams::kClosedLoop:
- // Closed-loop doesn't use random dist at all
- break;
- case LoadParams::kPoisson:
- random_dist.reset(
- new ExpDist(load.poisson().offered_load() / num_threads));
- break;
- default:
- GPR_ASSERT(false);
- }
-
- // Set closed_loop_ based on whether or not random_dist is set
- if (!random_dist) {
- closed_loop_ = true;
- } else {
- closed_loop_ = false;
- // set up interarrival timer according to random dist
- interarrival_timer_.init(*random_dist, num_threads);
- const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
- for (size_t i = 0; i < num_threads; i++) {
- next_time_.push_back(gpr_time_add(
- now,
- gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
- }
- }
- }
+ bool IsClosedLoop() { return closed_loop_; }
gpr_timespec NextIssueTime(int thread_idx) {
const gpr_timespec result = next_time_[thread_idx];
@@ -297,9 +246,9 @@ class Client {
GPR_TIMESPAN));
return result;
}
- std::function<gpr_timespec()> NextIssuer(int thread_idx) {
- return closed_loop_ ? std::function<gpr_timespec()>()
- : std::bind(&Client::NextIssueTime, this, thread_idx);
+
+ bool ThreadCompleted() {
+ return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
}
class Thread {
@@ -380,8 +329,62 @@ class Client {
double interval_start_time_;
};
- bool ThreadCompleted() {
- return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
+ protected:
+ bool closed_loop_;
+ gpr_atm thread_pool_done_;
+ double median_latency_collection_interval_seconds_; // In seconds
+
+ void StartThreads(size_t num_threads) {
+ gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
+ threads_remaining_ = num_threads;
+ for (size_t i = 0; i < num_threads; i++) {
+ threads_.emplace_back(new Thread(this, i));
+ }
+ }
+
+ void EndThreads() {
+ MaybeStartRequests();
+ threads_.clear();
+ }
+
+ virtual void DestroyMultithreading() = 0;
+
+ void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
+ // Set up the load distribution based on the number of threads
+ const auto& load = config.load_params();
+
+ std::unique_ptr<RandomDistInterface> random_dist;
+ switch (load.load_case()) {
+ case LoadParams::kClosedLoop:
+ // Closed-loop doesn't use random dist at all
+ break;
+ case LoadParams::kPoisson:
+ random_dist.reset(
+ new ExpDist(load.poisson().offered_load() / num_threads));
+ break;
+ default:
+ GPR_ASSERT(false);
+ }
+
+ // Set closed_loop_ based on whether or not random_dist is set
+ if (!random_dist) {
+ closed_loop_ = true;
+ } else {
+ closed_loop_ = false;
+ // set up interarrival timer according to random dist
+ interarrival_timer_.init(*random_dist, num_threads);
+ const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (size_t i = 0; i < num_threads; i++) {
+ next_time_.push_back(gpr_time_add(
+ now,
+ gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
+ }
+ }
+ }
+
+ std::function<gpr_timespec()> NextIssuer(int thread_idx) {
+ return closed_loop_ ? std::function<gpr_timespec()>()
+ : std::bind(&Client::NextIssueTime, this, thread_idx);
}
virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
@@ -429,19 +432,69 @@ class ClientImpl : public Client {
config.server_targets(i % config.server_targets_size()), config,
create_stub_, i);
}
- std::vector<std::unique_ptr<std::thread>> connecting_threads;
- for (auto& c : channels_) {
- connecting_threads.emplace_back(c.WaitForReady());
- }
- for (auto& t : connecting_threads) {
- t->join();
- }
+ WaitForChannelsToConnect();
median_latency_collection_interval_seconds_ =
config.median_latency_collection_interval_millis() / 1e3;
ClientRequestCreator<RequestType> create_req(&request_,
config.payload_config());
}
virtual ~ClientImpl() {}
+ const RequestType* request() { return &request_; }
+
+ void WaitForChannelsToConnect() {
+ int connect_deadline_seconds = 10;
+ /* Allow optionally overriding connect_deadline in order
+ * to deal with benchmark environments in which the server
+ * can take a long time to become ready. */
+ char* channel_connect_timeout_str =
+ gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
+ if (channel_connect_timeout_str != nullptr &&
+ strcmp(channel_connect_timeout_str, "") != 0) {
+ connect_deadline_seconds = atoi(channel_connect_timeout_str);
+ }
+ gpr_log(GPR_INFO,
+ "Waiting for up to %d seconds for all channels to connect",
+ connect_deadline_seconds);
+ gpr_free(channel_connect_timeout_str);
+ gpr_timespec connect_deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN));
+ CompletionQueue cq;
+ size_t num_remaining = 0;
+ for (auto& c : channels_) {
+ if (!c.is_inproc()) {
+ Channel* channel = c.get_channel();
+ grpc_connectivity_state last_observed = channel->GetState(true);
+ if (last_observed == GRPC_CHANNEL_READY) {
+ gpr_log(GPR_INFO, "Channel %p connected!", channel);
+ } else {
+ num_remaining++;
+ channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
+ channel);
+ }
+ }
+ }
+ while (num_remaining > 0) {
+ bool ok = false;
+ void* tag = nullptr;
+ cq.Next(&tag, &ok);
+ Channel* channel = static_cast<Channel*>(tag);
+ if (!ok) {
+ gpr_log(GPR_ERROR, "Channel %p failed to connect within the deadline",
+ channel);
+ abort();
+ } else {
+ grpc_connectivity_state last_observed = channel->GetState(true);
+ if (last_observed == GRPC_CHANNEL_READY) {
+ gpr_log(GPR_INFO, "Channel %p connected!", channel);
+ num_remaining--;
+ } else {
+ channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
+ channel);
+ }
+ }
+ }
+ }
protected:
const int cores_;
@@ -485,31 +538,7 @@ class ClientImpl : public Client {
}
Channel* get_channel() { return channel_.get(); }
StubType* get_stub() { return stub_.get(); }
-
- std::unique_ptr<std::thread> WaitForReady() {
- return std::unique_ptr<std::thread>(new std::thread([this]() {
- if (!is_inproc_) {
- int connect_deadline = 10;
- /* Allow optionally overriding connect_deadline in order
- * to deal with benchmark environments in which the server
- * can take a long time to become ready. */
- char* channel_connect_timeout_str =
- gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
- if (channel_connect_timeout_str != nullptr &&
- strcmp(channel_connect_timeout_str, "") != 0) {
- connect_deadline = atoi(channel_connect_timeout_str);
- }
- gpr_log(GPR_INFO,
- "Waiting for up to %d seconds for the channel %p to connect",
- connect_deadline, channel_.get());
- gpr_free(channel_connect_timeout_str);
- GPR_ASSERT(channel_->WaitForConnected(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(connect_deadline, GPR_TIMESPAN))));
- gpr_log(GPR_INFO, "Channel %p connected!", channel_.get());
- }
- }));
- }
+ bool is_inproc() { return is_inproc_; }
private:
void set_channel_args(const ClientConfig& config, ChannelArguments* args) {