aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_async.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r--test/cpp/qps/client_async.cc62
1 files changed, 39 insertions, 23 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index fbbc19f58d..06bc3ee9c2 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -57,9 +57,11 @@
namespace grpc {
namespace testing {
+typedef std::forward_list<grpc_time> deadline_list;
+
class ClientRpcContext {
public:
- ClientRpcContext() {}
+ ClientRpcContext(int ch): channel_id_(ch) {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0;
@@ -72,6 +74,9 @@ class ClientRpcContext {
deadline_list::iterator deadline_posn() const {return deadline_posn_;}
void set_deadline_posn(deadline_list::iterator&& it) {deadline_posn_ = it;}
virtual void Start() = 0;
+ int channel_id() const {return channel_id_;}
+ protected:
+ int channel_id_;
private:
deadline_list::iterator deadline_posn_;
};
@@ -79,14 +84,14 @@ class ClientRpcContext {
template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
- ClientRpcContextUnaryImpl(
+ ClientRpcContextUnaryImpl(int channel_id,
TestService::Stub* stub, const RequestType& req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&)>
start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
- : context_(),
+ : ClientRpcContext(channel_id), context_(),
stub_(stub),
req_(req),
response_(),
@@ -109,7 +114,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
}
ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
- return new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_);
+ return new ClientRpcContextUnaryImpl(channel_id_,
+ stub_, req_, start_req_, callback_);
}
private:
@@ -135,15 +141,14 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
response_reader_;
};
-typedef std::forward_list<grpc_time> deadline_list;
typedef std::forward_list<ClientRpcContext *> context_list;
class AsyncClient : public Client {
public:
explicit AsyncClient(const ClientConfig& config,
- std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*,
+ std::function<ClientRpcContext*(int, CompletionQueue*, TestService::Stub*,
const SimpleRequest&)> setup_ctx) :
- Client(config), channel_rpc_lock_(config.client_channels()),
+ Client(config), channel_lock_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
channel_count_(config.client_channels()) {
@@ -171,10 +176,10 @@ class AsyncClient : public Client {
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (int ch = 0; ch < channel_count_; ch++) {
- auto channel = channels_[ch];
+ auto& channel = channels_[ch];
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
- ClientRpcContext *ctx = setup_ctx(cq, channel->get_stub(), request_);
+ auto ctx = setup_ctx(ch, cq, channel.get_stub(), request_);
if (closed_loop_) {
// only relevant for closed_loop unary, but harmless for
// closed_loop streaming
@@ -245,7 +250,10 @@ class AsyncClient : public Client {
delete ctx;
if (!closed_loop_) {
// Put this in the list of idle contexts for this channel
-
+ // Under lock
+ int ch = clone_ctx->channel_id();
+ std::lock_guard<std::mutex> g(channel_lock_[ch]);
+ contexts_[ch].push_front(ctx);
}
}
issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
@@ -259,7 +267,7 @@ class AsyncClient : public Client {
next_channel_[thread_idx] =
(next_channel_[thread_idx]+1)%channel_count_) {
std::lock_guard<std::mutex>
- g(channel_rpc_lock_[next_channel_[thread_idx]]);
+ g(channel_lock_[next_channel_[thread_idx]]);
if ((rpcs_outstanding_[next_channel_[thread_idx]] <
max_outstanding_per_channel_) &&
!contexts_[next_channel_[thread_idx]].empty()) {
@@ -267,7 +275,7 @@ class AsyncClient : public Client {
auto ctx = contexts_[next_channel_[thread_idx]].begin();
contexts_[next_channel_[thread_idx]].pop_front();
// do the work to issue
- ctx->Start();
+ (*ctx)->Start();
rpcs_outstanding_[next_channel_[thread_idx]]++;
issued = true;
}
@@ -286,7 +294,7 @@ class AsyncClient : public Client {
std::vector<bool> issue_allowed_; // may this thread attempt to issue
std::vector<grpc_time> next_issue_; // when should it issue?
- std::vector<std::mutex> channel_rpc_lock_;
+ std::vector<std::mutex> channel_lock_;
std::vector<int> rpcs_outstanding_; // per-channel vector
std::vector<context_list> contexts_; // per-channel list of idle contexts
int max_outstanding_per_channel_;
@@ -301,15 +309,18 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
}
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
private:
- static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
- const SimpleRequest& req) {
+ static ClientRpcContext *SetupCtx(int channel_id,
+ CompletionQueue* cq,
+ TestService::Stub* stub,
+ const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
- return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
- stub, req, start_req, check_done);
+ return new ClientRpcContextUnaryImpl<SimpleRequest,
+ SimpleResponse>(channel_id, stub, req,
+ start_req, check_done);
}
};
@@ -317,13 +328,14 @@ private:
template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
- ClientRpcContextStreamingImpl(
+ ClientRpcContextStreamingImpl(int channel_id,
TestService::Stub* stub, const RequestType& req,
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
TestService::Stub*, grpc::ClientContext*, void*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
- : context_(),
+ : ClientRpcContext(channel_id),
+ context_(),
stub_(stub),
req_(req),
response_(),
@@ -337,7 +349,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return (this->*next_state_)(ok, hist);
}
ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
- return new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_);
+ return new ClientRpcContextStreamingImpl(channel_id_,
+ stub_, req_, start_req_, callback_);
}
void Start() GRPC_OVERRIDE {}
private:
@@ -387,7 +400,8 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
private:
- static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
+ static ClientRpcContext *SetupCtx(int channel_id,
+ CompletionQueue* cq, TestService::Stub* stub,
const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
@@ -395,8 +409,10 @@ private:
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
- return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- stub, req, start_req, check_done);
+ return new ClientRpcContextStreamingImpl<SimpleRequest,
+ SimpleResponse>(channel_id, stub,
+ req, start_req,
+ check_done);
}
};