aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client.h
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r--test/cpp/qps/client.h93
1 files changed, 60 insertions, 33 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 7fbaf63492..82c6361abd 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -37,10 +37,14 @@
#include "src/cpp/util/core_stats.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
+#include "test/cpp/qps/qps_worker.h"
+#include "test/cpp/qps/server.h"
#include "test/cpp/qps/usage_timer.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/util/test_credentials_provider.h"
+#define INPROC_NAME_PREFIX "qpsinproc:"
+
namespace grpc {
namespace testing {
@@ -226,7 +230,6 @@ class Client {
}
virtual void DestroyMultithreading() = 0;
- virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@@ -274,7 +277,6 @@ class Client {
: std::bind(&Client::NextIssueTime, this, thread_idx);
}
- private:
class Thread {
public:
Thread(Client* client, size_t idx)
@@ -294,39 +296,33 @@ class Client {
MergeStatusHistogram(statuses_, s);
}
+ void UpdateHistogram(HistogramEntry* entry) {
+ std::lock_guard<std::mutex> g(mu_);
+ if (entry->value_used()) {
+ histogram_.Add(entry->value());
+ }
+ if (entry->status_used()) {
+ statuses_[entry->status()]++;
+ }
+ }
+
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
void ThreadFunc() {
+ int wait_loop = 0;
while (!gpr_event_wait(
&client_->start_requests_,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(1, GPR_TIMESPAN)))) {
- gpr_log(GPR_INFO, "Waiting for benchmark to start");
+ gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
+ gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)",
+ idx_, wait_loop);
+ wait_loop++;
}
- for (;;) {
- // run the loop body
- HistogramEntry entry;
- const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
- // lock, update histogram if needed and see if we're done
- std::lock_guard<std::mutex> g(mu_);
- if (entry.value_used()) {
- histogram_.Add(entry.value());
- }
- if (entry.status_used()) {
- statuses_[entry.status()]++;
- }
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- }
- if (!thread_still_ok ||
- static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) {
- client_->CompleteThread();
- return;
- }
- }
+ client_->ThreadFunc(idx_, this);
+ client_->CompleteThread();
}
std::mutex mu_;
@@ -337,6 +333,12 @@ class Client {
std::thread impl_;
};
+ bool ThreadCompleted() {
+ return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
+ }
+
+ virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
+
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<UsageTimer> timer_;
@@ -380,6 +382,13 @@ class ClientImpl : public Client {
config.server_targets(i % config.server_targets_size()), config,
create_stub_, i);
}
+ std::vector<std::unique_ptr<std::thread>> connecting_threads;
+ for (auto& c : channels_) {
+ connecting_threads.emplace_back(c.WaitForReady());
+ }
+ for (auto& t : connecting_threads) {
+ t->join();
+ }
ClientRequestCreator<RequestType> create_req(&request_,
config.payload_config());
@@ -409,19 +418,36 @@ class ClientImpl : public Client {
type = config.security_params().cred_type();
}
- channel_ = CreateTestChannel(
- target, type, config.security_params().server_host_override(),
- !config.security_params().use_test_ca(),
- std::shared_ptr<CallCredentials>(), args);
- gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
- GPR_ASSERT(channel_->WaitForConnected(
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(300, GPR_TIMESPAN))));
+ grpc::string inproc_pfx(INPROC_NAME_PREFIX);
+ if (target.find(inproc_pfx) != 0) {
+ channel_ = CreateTestChannel(
+ target, type, config.security_params().server_host_override(),
+ !config.security_params().use_test_ca(),
+ std::shared_ptr<CallCredentials>(), args);
+ gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
+ is_inproc_ = false;
+ } else {
+ grpc::string tgt = target;
+ tgt.erase(0, inproc_pfx.length());
+ int srv_num = std::stoi(tgt);
+ channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
+ is_inproc_ = true;
+ }
stub_ = create_stub(channel_);
}
Channel* get_channel() { return channel_.get(); }
StubType* get_stub() { return stub_.get(); }
+ std::unique_ptr<std::thread> WaitForReady() {
+ return std::unique_ptr<std::thread>(new std::thread([this]() {
+ if (!is_inproc_) {
+ GPR_ASSERT(channel_->WaitForConnected(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(10, GPR_TIMESPAN))));
+ }
+ }));
+ }
+
private:
void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
for (auto channel_arg : config.channel_args()) {
@@ -437,6 +463,7 @@ class ClientImpl : public Client {
std::shared_ptr<Channel> channel_;
std::unique_ptr<StubType> stub_;
+ bool is_inproc_;
};
std::vector<ClientChannelInfo> channels_;
std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>