diff options
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r-- | test/cpp/qps/client.h | 93 |
1 files changed, 60 insertions, 33 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 7fbaf63492..82c6361abd 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -37,10 +37,14 @@ #include "src/cpp/util/core_stats.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" +#include "test/cpp/qps/qps_worker.h" +#include "test/cpp/qps/server.h" #include "test/cpp/qps/usage_timer.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/util/test_credentials_provider.h" +#define INPROC_NAME_PREFIX "qpsinproc:" + namespace grpc { namespace testing { @@ -226,7 +230,6 @@ class Client { } virtual void DestroyMultithreading() = 0; - virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -274,7 +277,6 @@ class Client { : std::bind(&Client::NextIssueTime, this, thread_idx); } - private: class Thread { public: Thread(Client* client, size_t idx) @@ -294,39 +296,33 @@ class Client { MergeStatusHistogram(statuses_, s); } + void UpdateHistogram(HistogramEntry* entry) { + std::lock_guard<std::mutex> g(mu_); + if (entry->value_used()) { + histogram_.Add(entry->value()); + } + if (entry->status_used()) { + statuses_[entry->status()]++; + } + } + private: Thread(const Thread&); Thread& operator=(const Thread&); void ThreadFunc() { + int wait_loop = 0; while (!gpr_event_wait( &client_->start_requests_, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(1, GPR_TIMESPAN)))) { - gpr_log(GPR_INFO, "Waiting for benchmark to start"); + gpr_time_from_seconds(20, GPR_TIMESPAN)))) { + gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)", + idx_, wait_loop); + wait_loop++; } - for (;;) { - // run the loop body - HistogramEntry entry; - const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); - // lock, update histogram if needed and see if we're done - std::lock_guard<std::mutex> g(mu_); - if (entry.value_used()) { - histogram_.Add(entry.value()); - } - if (entry.status_used()) { - statuses_[entry.status()]++; - } - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } - if (!thread_still_ok || - static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) { - client_->CompleteThread(); - return; - } - } + client_->ThreadFunc(idx_, this); + client_->CompleteThread(); } std::mutex mu_; @@ -337,6 +333,12 @@ class Client { std::thread impl_; }; + bool ThreadCompleted() { + return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_)); + } + + virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0; + std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<UsageTimer> timer_; @@ -380,6 +382,13 @@ class ClientImpl : public Client { config.server_targets(i % config.server_targets_size()), config, create_stub_, i); } + std::vector<std::unique_ptr<std::thread>> connecting_threads; + for (auto& c : channels_) { + connecting_threads.emplace_back(c.WaitForReady()); + } + for (auto& t : connecting_threads) { + t->join(); + } ClientRequestCreator<RequestType> create_req(&request_, config.payload_config()); @@ -409,19 +418,36 @@ class ClientImpl : public Client { type = config.security_params().cred_type(); } - channel_ = CreateTestChannel( - target, type, config.security_params().server_host_override(), - !config.security_params().use_test_ca(), - std::shared_ptr<CallCredentials>(), args); - gpr_log(GPR_INFO, "Connecting to %s", target.c_str()); - GPR_ASSERT(channel_->WaitForConnected( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(300, GPR_TIMESPAN)))); + grpc::string inproc_pfx(INPROC_NAME_PREFIX); + if (target.find(inproc_pfx) != 0) { + channel_ = CreateTestChannel( + target, type, config.security_params().server_host_override(), + !config.security_params().use_test_ca(), + std::shared_ptr<CallCredentials>(), args); + gpr_log(GPR_INFO, "Connecting to %s", target.c_str()); + is_inproc_ = false; + } else { + grpc::string tgt = target; + tgt.erase(0, inproc_pfx.length()); + int srv_num = std::stoi(tgt); + channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args); + is_inproc_ = true; + } stub_ = create_stub(channel_); } Channel* get_channel() { return channel_.get(); } StubType* get_stub() { return stub_.get(); } + std::unique_ptr<std::thread> WaitForReady() { + return std::unique_ptr<std::thread>(new std::thread([this]() { + if (!is_inproc_) { + GPR_ASSERT(channel_->WaitForConnected( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(10, GPR_TIMESPAN)))); + } + })); + } + private: void set_channel_args(const ClientConfig& config, ChannelArguments* args) { for (auto channel_arg : config.channel_args()) { @@ -437,6 +463,7 @@ class ClientImpl : public Client { std::shared_ptr<Channel> channel_; std::unique_ptr<StubType> stub_; + bool is_inproc_; }; std::vector<ClientChannelInfo> channels_; std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)> |