diff options
author | vjpai <vpai@google.com> | 2015-06-02 10:26:52 -0700 |
---|---|---|
committer | vjpai <vpai@google.com> | 2015-06-02 10:26:52 -0700 |
commit | 924d459c271acb6cf940da8183bcb8eaacb6b662 (patch) | |
tree | 344e9b84fe979dab6e7887ed2a3588637a5677fc /test/cpp/qps | |
parent | 1795985322eb8b4e689c5e175d9d6937839f817c (diff) |
Fix timer issues
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client.h | 15 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 21 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 2 |
3 files changed, 21 insertions, 17 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2b227ec909..dd37b88fb4 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -45,6 +45,9 @@ namespace grpc { namespace testing { +typedef std::chrono::system_clock grpc_time_source; +typedef std::chrono::time_point<grpc_time_source> grpc_time; + class Client { public: explicit Client(const ClientConfig& config) : timer_(new Timer), @@ -145,19 +148,18 @@ class Client { interarrival_timer_.init(*random_dist, num_threads); for (size_t i = 0; i<num_threads; i++) { - next_time_.push_back(std::chrono::high_resolution_clock::now() - + interarrival_timer_(i)); + next_time_.push_back(grpc_time_source::now() + + std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(i))); } } } - template<class Timepoint> - bool NextIssueTime(int thread_idx, Timepoint *time_delay) { + bool NextIssueTime(int thread_idx, grpc_time *time_delay) { if (closed_loop_) { return false; } else { *time_delay = next_time_[thread_idx]; - next_time_[thread_idx] += interarrival_timer_(thread_idx); + next_time_[thread_idx] += std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(thread_idx)); return true; } } @@ -226,8 +228,7 @@ class Client { std::unique_ptr<Timer> timer_; InterarrivalTimer interarrival_timer_; - std::vector<std::chrono::time_point - <std::chrono::high_resolution_clock>> next_time_; + std::vector<grpc_time> next_time_; }; std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 2d23192767..bd77424578 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -32,8 +32,10 @@ */ #include <cassert> +#include <forward_list> #include <functional> #include <memory> +#include <mutex> #include <string> #include <thread> #include <vector> @@ -55,8 +57,6 @@ namespace grpc { namespace testing { -typedef std::chrono::high_resolution_clock grpc_time_source; -typedef std::chrono::time_point<grpc_time_source> grpc_time; typedef std::forward_list<grpc_time> deadline_list; class ClientRpcContext { @@ -98,7 +98,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } void Start() GRPC_OVERRIDE { start_ = Timer::Now(); - response_reader_.reset(start_req(stub_, &context_, req_)); + response_reader_ = start_req_(stub_, &context_, req_); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); } ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} @@ -142,7 +142,7 @@ class AsyncClient : public Client { explicit AsyncClient(const ClientConfig& config, std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*, const SimpleRequest&)> setup_ctx) : - Client(config) { + Client(config), channel_rpc_lock_(config.client_channels()) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); if (!closed_loop_) { @@ -158,7 +158,6 @@ class AsyncClient : public Client { if (!closed_loop_) { for (auto channel = channels_.begin(); channel != channels_.end(); channel++) { - channel_rpc_lock_.emplace_back(); rpcs_outstanding_.push_back(0); } } @@ -202,6 +201,9 @@ class AsyncClient : public Client { short_deadline = issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline; } + + bool got_event; + switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::TIMEOUT: @@ -232,15 +234,16 @@ class AsyncClient : public Client { bool issued = false; for (int num_attempts = 0; num_attempts < channel_count_ && !issued; num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { - std::lock_guard g(channel_rpc_lock_[next_channel_[thread_idx]]); - if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) { + std::lock_guard<std::mutex> + g(channel_rpc_lock_[next_channel_[thread_idx]]); + if (rpcs_outstanding_[next_channel_[thread_idx]] < max_outstanding_per_channel_) { // do the work to issue - rpcs_outstanding[next_channel_[thread_idx]]++; + rpcs_outstanding_[next_channel_[thread_idx]]++; issued = true; } } if (!issued) - issue_allowed = false; + issue_allowed_[thread_idx] = false; } return true; } diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 98297d3abb..d1682caf06 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -77,7 +77,7 @@ class SynchronousClient : public Client { protected: void WaitToIssue(int thread_idx) { - std::chrono::time_point<std::chrono::high_resolution_clock> next_time; + grpc_time next_time; if (NextIssueTime(thread_idx, &next_time)) { std::this_thread::sleep_until(next_time); } |