aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_callback.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client_callback.cc')
-rw-r--r--test/cpp/qps/client_callback.cc35
1 files changed, 21 insertions, 14 deletions
diff --git a/test/cpp/qps/client_callback.cc b/test/cpp/qps/client_callback.cc
index 00d5853a8e..1880f46d43 100644
--- a/test/cpp/qps/client_callback.cc
+++ b/test/cpp/qps/client_callback.cc
@@ -66,7 +66,10 @@ class CallbackClient
config, BenchmarkStubCreator) {
num_threads_ = NumThreads(config);
rpcs_done_ = 0;
- SetupLoadTest(config, num_threads_);
+
+ // Don't divide the fixed load among threads as the user threads
+ // only bootstrap the RPCs
+ SetupLoadTest(config, 1);
total_outstanding_rpcs_ =
config.client_channels() * config.outstanding_rpcs_per_channel();
}
@@ -87,6 +90,11 @@ class CallbackClient
}
}
+ gpr_timespec NextIssueTime() {
+ std::lock_guard<std::mutex> l(next_issue_time_mu_);
+ return Client::NextIssueTime(0);
+ }
+
protected:
size_t num_threads_;
size_t total_outstanding_rpcs_;
@@ -108,6 +116,8 @@ class CallbackClient
}
private:
+ std::mutex next_issue_time_mu_; // Used by next issue time
+
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
@@ -146,7 +156,7 @@ class CallbackUnaryClient final : public CallbackClient {
bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
vector_idx += num_threads_) {
- ScheduleRpc(t, thread_idx, vector_idx);
+ ScheduleRpc(t, vector_idx);
}
return true;
}
@@ -154,26 +164,26 @@ class CallbackUnaryClient final : public CallbackClient {
void InitThreadFuncImpl(size_t thread_idx) override { return; }
private:
- void ScheduleRpc(Thread* t, size_t thread_idx, size_t vector_idx) {
+ void ScheduleRpc(Thread* t, size_t vector_idx) {
if (!closed_loop_) {
- gpr_timespec next_issue_time = NextIssueTime(thread_idx);
+ gpr_timespec next_issue_time = NextIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_[vector_idx]->alarm_.experimental().Set(
- next_issue_time, [this, t, thread_idx, vector_idx](bool ok) {
- IssueUnaryCallbackRpc(t, thread_idx, vector_idx);
+ next_issue_time, [this, t, vector_idx](bool ok) {
+ IssueUnaryCallbackRpc(t, vector_idx);
});
} else {
- IssueUnaryCallbackRpc(t, thread_idx, vector_idx);
+ IssueUnaryCallbackRpc(t, vector_idx);
}
}
- void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t vector_idx) {
+ void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0);
double start = UsageTimer::Now();
ctx_[vector_idx]->stub_->experimental_async()->UnaryCall(
(&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
- [this, t, thread_idx, start, vector_idx](grpc::Status s) {
+ [this, t, start, vector_idx](grpc::Status s) {
// Update Histogram with data from the callback run
HistogramEntry entry;
if (s.ok()) {
@@ -190,7 +200,7 @@ class CallbackUnaryClient final : public CallbackClient {
ctx_[vector_idx].reset(
new CallbackClientRpcContext(ctx_[vector_idx]->stub_));
// Schedule a new RPC
- ScheduleRpc(t, thread_idx, vector_idx);
+ ScheduleRpc(t, vector_idx);
}
});
}
@@ -287,7 +297,7 @@ class CallbackStreamingPingPongReactor final
if (client_->ThreadCompleted()) return;
if (!client_->IsClosedLoop()) {
- gpr_timespec next_issue_time = client_->NextIssueTime(thread_idx_);
+ gpr_timespec next_issue_time = client_->NextIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_->alarm_.experimental().Set(next_issue_time,
@@ -298,11 +308,9 @@ class CallbackStreamingPingPongReactor final
}
void set_thread_ptr(void* ptr) { thread_ptr_ = ptr; }
- void set_thread_idx(int thread_idx) { thread_idx_ = thread_idx; }
CallbackStreamingPingPongClient* client_;
std::unique_ptr<CallbackClientRpcContext> ctx_;
- int thread_idx_; // Needed to update histogram entries
void* thread_ptr_; // Needed to update histogram entries
double start_; // Track message start time
int messages_issued_; // Messages issued by this stream
@@ -323,7 +331,6 @@ class CallbackStreamingPingPongClientImpl final
for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
vector_idx += num_threads_) {
reactor_[vector_idx]->set_thread_ptr(t);
- reactor_[vector_idx]->set_thread_idx(thread_idx);
reactor_[vector_idx]->ScheduleRpc();
}
return true;