aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_async.cc
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2015-05-12 10:29:07 -0700
committerGravatar vjpai <vpai@google.com>2015-05-12 10:29:07 -0700
commit37f7257529877084c76e2e607c33a384640c5e50 (patch)
treee75fb5cb754afb07bbb7b02fd5f51d8a18fdc1b2 /test/cpp/qps/client_async.cc
parent0426ad0fcb3dbcd9f46621eedc226c424088e189 (diff)
WIP
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r--test/cpp/qps/client_async.cc121
1 files changed, 97 insertions, 24 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index e3ab57728d..fa1a799f1b 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -55,6 +55,10 @@
namespace grpc {
namespace testing {
+typedef std::chrono::high_resolution_clock grpc_time_source;
+typedef std::chrono::time_point<grpc_time_source> grpc_time;
+typedef std::forward_list<grpc_time> deadline_list;
+
class ClientRpcContext {
public:
ClientRpcContext() {}
@@ -66,6 +70,12 @@ class ClientRpcContext {
static ClientRpcContext* detag(void* t) {
return reinterpret_cast<ClientRpcContext*>(t);
}
+
+ deadline_list::iterator deadline_posn() const {return deadline_posn_;}
+ void set_deadline_posn(deadline_list::iterator&& it) {deadline_posn_ = it;}
+ virtual void Start() = 0;
+ private:
+ deadline_list::iterator deadline_posn_;
};
template <class RequestType, class ResponseType>
@@ -84,9 +94,11 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
response_(),
next_state_(&ClientRpcContextUnaryImpl::RespDone),
callback_(on_done),
- start_req_(start_req),
- start_(Timer::Now()),
- response_reader_(start_req(stub_, &context_, req_)) {
+ start_req_(start_req) {
+ }
+ void Start() GRPC_OVERRIDE {
+ start_ = Timer::Now();
+ response_reader_.reset(start_req(stub_, &context_, req_));
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
@@ -133,14 +145,32 @@ class AsyncClient : public Client {
Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue);
+ if (!closed_loop_) {
+ rpc_deadlines_.emplace_back();
+ next_channel_.push_back(i % channel_count_);
+ issue_allowed_.push_back(true);
+
+ grpc_time next_issue;
+ NextIssueTime(i, &next_issue);
+ next_issue_.push_back(next_issue);
+ }
}
- int t = 0;
- for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
+ if (!closed_loop_) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
- auto* cq = cli_cqs_[t].get();
- t = (t + 1) % cli_cqs_.size();
- setup_ctx(cq, channel->get_stub(), request_);
+ channel_rpc_count_lock.emplace_back();
+ rpcs_outstanding_.push_back(0);
+ }
+ }
+ else {
+ int t = 0;
+ for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
+ for (auto channel = channels_.begin(); channel != channels_.end();
+ channel++) {
+ auto* cq = cli_cqs_[t].get();
+ t = (t + 1) % cli_cqs_.size();
+ setup_ctx(cq, channel->get_stub(), request_);
+ }
}
}
}
@@ -159,26 +189,68 @@ class AsyncClient : public Client {
GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok,
- std::chrono::system_clock::now() +
- std::chrono::seconds(1))) {
- case CompletionQueue::SHUTDOWN: return false;
- case CompletionQueue::TIMEOUT: return true;
- case CompletionQueue::GOT_EVENT: break;
+ grpc_time deadline, short_deadline;
+ if (closed_loop_) {
+ deadline = grpc_time_source::now() + std::chrono::seconds(1);
+ short_deadline = deadline;
+ } else {
+ deadline = *(rpc_deadlines_[thread_idx].begin());
+ short_deadline = issue_allowed_[thread_idx] ?
+ next_issue_[thread_idx] : deadline;
}
-
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- if (ctx->RunNextState(ok, histogram) == false) {
- // call the callback and then delete it
- ctx->RunNextState(ok, histogram);
- ctx->StartNewClone();
- delete ctx;
+ switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
+ case CompletionQueue::SHUTDOWN: return false;
+ case CompletionQueue::TIMEOUT:
+ got_event = false;
+ break;
+ case CompletionQueue::GOT_EVENT:
+ got_event = true;
+ break;
}
-
- return true;
+ if (grpc_time_source::now() > deadline) {
+ // we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
+ return false;
+ }
+ if (got_event) {
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ if (ctx->RunNextState(ok, histogram) == false) {
+ // call the callback and then delete it
+ rpc_deadlines_[thread_idx].erase_after(ctx->deadline_posn());
+ ctx->RunNextState(ok, histogram);
+ ctx->StartNewClone();
+ delete ctx;
+ }
+ issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
+ }
+ if (issue_allowed && grpc_time_source::now() >= next_issue_[thread_idx]) {
+ // Attempt to issue
+ bool issued = false;
+ for (int num_attempts = 0; num_attempts < channel_count_ && !issued;
+ num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) {
+ std::lock_guard g(channel_rpc_count_lock_[next_channel_[thread_idx]]);
+ if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) {
+ // do the work to issue
+ rpcs_outstanding[next_channel_[thread_idx]]++;
+ issued = true;
+ }
+ }
+ if (!issued)
+ issue_allowed = false;
+ }
+ return true;
}
private:
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
+
+ std::vector<deadline_list> rpc_deadlines_; // per thread deadlines
+ std::vector<int> next_channel_; // per thread round-robin channel ctr
+ std::vector<bool> issue_allowed_; // may this thread attempt to issue
+ std::vector<grpc_time> next_issue_; // when should it issue?
+
+ std::vector<std::mutex> channel_rpc_count_lock_;
+ std::vector<int> rpcs_outstanding_; // per-channel vector
+ int max_outstanding_per_channel_;
+ int channel_count_;
};
class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
@@ -199,6 +271,7 @@ private:
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, req, start_req, check_done);
}
+
};
template <class RequestType, class ResponseType>
@@ -227,7 +300,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
void StartNewClone() GRPC_OVERRIDE {
new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_);
}
-
+ void Start() GRPC_OVERRIDE {}
private:
bool ReqSent(bool ok, Histogram *) {
return StartWrite(ok);