From 50eaf4671814f017ed44ce9a220025c87e1a1cc2 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 20 Dec 2018 17:35:12 -0800 Subject: Refactor benchmark initial channel wait for ready to be single threaded --- test/cpp/qps/client.h | 89 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 32 deletions(-) (limited to 'test') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 668d941916..73f91eed2d 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -429,13 +429,7 @@ class ClientImpl : public Client { config.server_targets(i % config.server_targets_size()), config, create_stub_, i); } - std::vector> connecting_threads; - for (auto& c : channels_) { - connecting_threads.emplace_back(c.WaitForReady()); - } - for (auto& t : connecting_threads) { - t->join(); - } + WaitForChannelsToConnect(); median_latency_collection_interval_seconds_ = config.median_latency_collection_interval_millis() / 1e3; ClientRequestCreator create_req(&request_, @@ -443,6 +437,61 @@ class ClientImpl : public Client { } virtual ~ClientImpl() {} + void WaitForChannelsToConnect() { + int connect_deadline_seconds = 10; + /* Allow optionally overriding connect_deadline in order + * to deal with benchmark environments in which the server + * can take a long time to become ready. */ + char* channel_connect_timeout_str = + gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT"); + if (channel_connect_timeout_str != nullptr && + strcmp(channel_connect_timeout_str, "") != 0) { + connect_deadline_seconds = atoi(channel_connect_timeout_str); + } + gpr_log(GPR_INFO, + "Waiting for up to %d seconds for all channels to connect", + connect_deadline_seconds); + gpr_free(channel_connect_timeout_str); + gpr_timespec connect_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN)); + CompletionQueue cq; + size_t num_remaining = 0; + for (auto& c : channels_) { + if (!c.is_inproc()) { + Channel* channel = c.get_channel(); + grpc_connectivity_state last_observed = channel->GetState(true); + if (last_observed == GRPC_CHANNEL_READY) { + gpr_log(GPR_INFO, "Channel %p connected!", channel); + } else { + num_remaining++; + channel->NotifyOnStateChange(last_observed, connect_deadline, &cq, + channel); + } + } + } + while (num_remaining > 0) { + bool ok = false; + void* tag = nullptr; + cq.Next(&tag, &ok); + Channel* channel = static_cast(tag); + if (!ok) { + gpr_log(GPR_ERROR, "Channel %p failed to connect within the deadline", + channel); + abort(); + } else { + grpc_connectivity_state last_observed = channel->GetState(true); + if (last_observed == GRPC_CHANNEL_READY) { + gpr_log(GPR_INFO, "Channel %p connected!", channel); + num_remaining--; + } else { + channel->NotifyOnStateChange(last_observed, connect_deadline, &cq, + channel); + } + } + } + } + protected: const int cores_; RequestType request_; @@ -485,31 +534,7 @@ class ClientImpl : public Client { } Channel* get_channel() { return channel_.get(); } StubType* get_stub() { return stub_.get(); } - - std::unique_ptr WaitForReady() { - return std::unique_ptr(new std::thread([this]() { - if (!is_inproc_) { - int connect_deadline = 10; - /* Allow optionally overriding connect_deadline in order - * to deal with benchmark environments in which the server - * can take a long time to become ready. */ - char* channel_connect_timeout_str = - gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT"); - if (channel_connect_timeout_str != nullptr && - strcmp(channel_connect_timeout_str, "") != 0) { - connect_deadline = atoi(channel_connect_timeout_str); - } - gpr_log(GPR_INFO, - "Waiting for up to %d seconds for the channel %p to connect", - connect_deadline, channel_.get()); - gpr_free(channel_connect_timeout_str); - GPR_ASSERT(channel_->WaitForConnected(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(connect_deadline, GPR_TIMESPAN)))); - gpr_log(GPR_INFO, "Channel %p connected!", channel_.get()); - } - })); - } + bool is_inproc() { return is_inproc_; } private: void set_channel_args(const ClientConfig& config, ChannelArguments* args) { -- cgit v1.2.3