aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2015-06-02 10:26:52 -0700
committerGravatar vjpai <vpai@google.com>2015-06-02 10:26:52 -0700
commit924d459c271acb6cf940da8183bcb8eaacb6b662 (patch)
tree344e9b84fe979dab6e7887ed2a3588637a5677fc /test/cpp/qps
parent1795985322eb8b4e689c5e175d9d6937839f817c (diff)
Fix timer issues
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/client.h15
-rw-r--r--test/cpp/qps/client_async.cc21
-rw-r--r--test/cpp/qps/client_sync.cc2
3 files changed, 21 insertions, 17 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 2b227ec909..dd37b88fb4 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -45,6 +45,9 @@
namespace grpc {
namespace testing {
+typedef std::chrono::system_clock grpc_time_source;
+typedef std::chrono::time_point<grpc_time_source> grpc_time;
+
class Client {
public:
explicit Client(const ClientConfig& config) : timer_(new Timer),
@@ -145,19 +148,18 @@ class Client {
interarrival_timer_.init(*random_dist, num_threads);
for (size_t i = 0; i<num_threads; i++) {
- next_time_.push_back(std::chrono::high_resolution_clock::now()
- + interarrival_timer_(i));
+ next_time_.push_back(grpc_time_source::now() +
+ std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(i)));
}
}
}
- template<class Timepoint>
- bool NextIssueTime(int thread_idx, Timepoint *time_delay) {
+ bool NextIssueTime(int thread_idx, grpc_time *time_delay) {
if (closed_loop_) {
return false;
}
else {
*time_delay = next_time_[thread_idx];
- next_time_[thread_idx] += interarrival_timer_(thread_idx);
+ next_time_[thread_idx] += std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(thread_idx));
return true;
}
}
@@ -226,8 +228,7 @@ class Client {
std::unique_ptr<Timer> timer_;
InterarrivalTimer interarrival_timer_;
- std::vector<std::chrono::time_point
- <std::chrono::high_resolution_clock>> next_time_;
+ std::vector<grpc_time> next_time_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 2d23192767..bd77424578 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -32,8 +32,10 @@
*/
#include <cassert>
+#include <forward_list>
#include <functional>
#include <memory>
+#include <mutex>
#include <string>
#include <thread>
#include <vector>
@@ -55,8 +57,6 @@
namespace grpc {
namespace testing {
-typedef std::chrono::high_resolution_clock grpc_time_source;
-typedef std::chrono::time_point<grpc_time_source> grpc_time;
typedef std::forward_list<grpc_time> deadline_list;
class ClientRpcContext {
@@ -98,7 +98,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
}
void Start() GRPC_OVERRIDE {
start_ = Timer::Now();
- response_reader_.reset(start_req(stub_, &context_, req_));
+ response_reader_ = start_req_(stub_, &context_, req_);
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
@@ -142,7 +142,7 @@ class AsyncClient : public Client {
explicit AsyncClient(const ClientConfig& config,
std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*,
const SimpleRequest&)> setup_ctx) :
- Client(config) {
+ Client(config), channel_rpc_lock_(config.client_channels()) {
for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue);
if (!closed_loop_) {
@@ -158,7 +158,6 @@ class AsyncClient : public Client {
if (!closed_loop_) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
- channel_rpc_lock_.emplace_back();
rpcs_outstanding_.push_back(0);
}
}
@@ -202,6 +201,9 @@ class AsyncClient : public Client {
short_deadline = issue_allowed_[thread_idx] ?
next_issue_[thread_idx] : deadline;
}
+
+ bool got_event;
+
switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
case CompletionQueue::SHUTDOWN: return false;
case CompletionQueue::TIMEOUT:
@@ -232,15 +234,16 @@ class AsyncClient : public Client {
bool issued = false;
for (int num_attempts = 0; num_attempts < channel_count_ && !issued;
num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) {
- std::lock_guard g(channel_rpc_lock_[next_channel_[thread_idx]]);
- if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) {
+ std::lock_guard<std::mutex>
+ g(channel_rpc_lock_[next_channel_[thread_idx]]);
+ if (rpcs_outstanding_[next_channel_[thread_idx]] < max_outstanding_per_channel_) {
// do the work to issue
- rpcs_outstanding[next_channel_[thread_idx]]++;
+ rpcs_outstanding_[next_channel_[thread_idx]]++;
issued = true;
}
}
if (!issued)
- issue_allowed = false;
+ issue_allowed_[thread_idx] = false;
}
return true;
}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 98297d3abb..d1682caf06 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -77,7 +77,7 @@ class SynchronousClient : public Client {
protected:
void WaitToIssue(int thread_idx) {
- std::chrono::time_point<std::chrono::high_resolution_clock> next_time;
+ grpc_time next_time;
if (NextIssueTime(thread_idx, &next_time)) {
std::this_thread::sleep_until(next_time);
}