diff options
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r-- | test/cpp/qps/client.h | 207 |
1 files changed, 118 insertions, 89 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 668d941916..ceb5cdd710 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -236,58 +236,7 @@ class Client { return 0; } - protected: - bool closed_loop_; - gpr_atm thread_pool_done_; - double median_latency_collection_interval_seconds_; // In seconds - - void StartThreads(size_t num_threads) { - gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false)); - threads_remaining_ = num_threads; - for (size_t i = 0; i < num_threads; i++) { - threads_.emplace_back(new Thread(this, i)); - } - } - - void EndThreads() { - MaybeStartRequests(); - threads_.clear(); - } - - virtual void DestroyMultithreading() = 0; - - void SetupLoadTest(const ClientConfig& config, size_t num_threads) { - // Set up the load distribution based on the number of threads - const auto& load = config.load_params(); - - std::unique_ptr<RandomDistInterface> random_dist; - switch (load.load_case()) { - case LoadParams::kClosedLoop: - // Closed-loop doesn't use random dist at all - break; - case LoadParams::kPoisson: - random_dist.reset( - new ExpDist(load.poisson().offered_load() / num_threads)); - break; - default: - GPR_ASSERT(false); - } - - // Set closed_loop_ based on whether or not random_dist is set - if (!random_dist) { - closed_loop_ = true; - } else { - closed_loop_ = false; - // set up interarrival timer according to random dist - interarrival_timer_.init(*random_dist, num_threads); - const auto now = gpr_now(GPR_CLOCK_MONOTONIC); - for (size_t i = 0; i < num_threads; i++) { - next_time_.push_back(gpr_time_add( - now, - gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN))); - } - } - } + bool IsClosedLoop() { return closed_loop_; } gpr_timespec NextIssueTime(int thread_idx) { const gpr_timespec result = next_time_[thread_idx]; @@ -297,9 +246,9 @@ class Client { GPR_TIMESPAN)); return result; } - std::function<gpr_timespec()> NextIssuer(int thread_idx) { - return closed_loop_ ? std::function<gpr_timespec()>() - : std::bind(&Client::NextIssueTime, this, thread_idx); + + bool ThreadCompleted() { + return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_)); } class Thread { @@ -380,8 +329,62 @@ class Client { double interval_start_time_; }; - bool ThreadCompleted() { - return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_)); + protected: + bool closed_loop_; + gpr_atm thread_pool_done_; + double median_latency_collection_interval_seconds_; // In seconds + + void StartThreads(size_t num_threads) { + gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false)); + threads_remaining_ = num_threads; + for (size_t i = 0; i < num_threads; i++) { + threads_.emplace_back(new Thread(this, i)); + } + } + + void EndThreads() { + MaybeStartRequests(); + threads_.clear(); + } + + virtual void DestroyMultithreading() = 0; + + void SetupLoadTest(const ClientConfig& config, size_t num_threads) { + // Set up the load distribution based on the number of threads + const auto& load = config.load_params(); + + std::unique_ptr<RandomDistInterface> random_dist; + switch (load.load_case()) { + case LoadParams::kClosedLoop: + // Closed-loop doesn't use random dist at all + break; + case LoadParams::kPoisson: + random_dist.reset( + new ExpDist(load.poisson().offered_load() / num_threads)); + break; + default: + GPR_ASSERT(false); + } + + // Set closed_loop_ based on whether or not random_dist is set + if (!random_dist) { + closed_loop_ = true; + } else { + closed_loop_ = false; + // set up interarrival timer according to random dist + interarrival_timer_.init(*random_dist, num_threads); + const auto now = gpr_now(GPR_CLOCK_MONOTONIC); + for (size_t i = 0; i < num_threads; i++) { + next_time_.push_back(gpr_time_add( + now, + gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN))); + } + } + } + + std::function<gpr_timespec()> NextIssuer(int thread_idx) { + return closed_loop_ ? std::function<gpr_timespec()>() + : std::bind(&Client::NextIssueTime, this, thread_idx); } virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0; @@ -429,19 +432,69 @@ class ClientImpl : public Client { config.server_targets(i % config.server_targets_size()), config, create_stub_, i); } - std::vector<std::unique_ptr<std::thread>> connecting_threads; - for (auto& c : channels_) { - connecting_threads.emplace_back(c.WaitForReady()); - } - for (auto& t : connecting_threads) { - t->join(); - } + WaitForChannelsToConnect(); median_latency_collection_interval_seconds_ = config.median_latency_collection_interval_millis() / 1e3; ClientRequestCreator<RequestType> create_req(&request_, config.payload_config()); } virtual ~ClientImpl() {} + const RequestType* request() { return &request_; } + + void WaitForChannelsToConnect() { + int connect_deadline_seconds = 10; + /* Allow optionally overriding connect_deadline in order + * to deal with benchmark environments in which the server + * can take a long time to become ready. */ + char* channel_connect_timeout_str = + gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT"); + if (channel_connect_timeout_str != nullptr && + strcmp(channel_connect_timeout_str, "") != 0) { + connect_deadline_seconds = atoi(channel_connect_timeout_str); + } + gpr_log(GPR_INFO, + "Waiting for up to %d seconds for all channels to connect", + connect_deadline_seconds); + gpr_free(channel_connect_timeout_str); + gpr_timespec connect_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN)); + CompletionQueue cq; + size_t num_remaining = 0; + for (auto& c : channels_) { + if (!c.is_inproc()) { + Channel* channel = c.get_channel(); + grpc_connectivity_state last_observed = channel->GetState(true); + if (last_observed == GRPC_CHANNEL_READY) { + gpr_log(GPR_INFO, "Channel %p connected!", channel); + } else { + num_remaining++; + channel->NotifyOnStateChange(last_observed, connect_deadline, &cq, + channel); + } + } + } + while (num_remaining > 0) { + bool ok = false; + void* tag = nullptr; + cq.Next(&tag, &ok); + Channel* channel = static_cast<Channel*>(tag); + if (!ok) { + gpr_log(GPR_ERROR, "Channel %p failed to connect within the deadline", + channel); + abort(); + } else { + grpc_connectivity_state last_observed = channel->GetState(true); + if (last_observed == GRPC_CHANNEL_READY) { + gpr_log(GPR_INFO, "Channel %p connected!", channel); + num_remaining--; + } else { + channel->NotifyOnStateChange(last_observed, connect_deadline, &cq, + channel); + } + } + } + } protected: const int cores_; @@ -485,31 +538,7 @@ class ClientImpl : public Client { } Channel* get_channel() { return channel_.get(); } StubType* get_stub() { return stub_.get(); } - - std::unique_ptr<std::thread> WaitForReady() { - return std::unique_ptr<std::thread>(new std::thread([this]() { - if (!is_inproc_) { - int connect_deadline = 10; - /* Allow optionally overriding connect_deadline in order - * to deal with benchmark environments in which the server - * can take a long time to become ready. */ - char* channel_connect_timeout_str = - gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT"); - if (channel_connect_timeout_str != nullptr && - strcmp(channel_connect_timeout_str, "") != 0) { - connect_deadline = atoi(channel_connect_timeout_str); - } - gpr_log(GPR_INFO, - "Waiting for up to %d seconds for the channel %p to connect", - connect_deadline, channel_.get()); - gpr_free(channel_connect_timeout_str); - GPR_ASSERT(channel_->WaitForConnected(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(connect_deadline, GPR_TIMESPAN)))); - gpr_log(GPR_INFO, "Channel %p connected!", channel_.get()); - } - })); - } + bool is_inproc() { return is_inproc_; } private: void set_channel_args(const ClientConfig& config, ChannelArguments* args) { |